Author: jlowe Date: Fri Dec 21 18:24:25 2012 New Revision: 1425071 URL: http://svn.apache.org/viewvc?rev=1425071&view=rev Log: MAPREDUCE-4842. Shuffle race can hang reducer. Contributed by Mariappan Asokan
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1425071&r1=1425070&r2=1425071&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Fri Dec 21 18:24:25 2012 @@ -630,6 +630,8 @@ Release 0.23.6 - UNRELEASED MAPREDUCE-4836. Elapsed time for running tasks on AM web UI tasks page is 0 (Ravi Prakash via jeagles) + MAPREDUCE-4842. Shuffle race can hang reducer (Mariappan Asokan via jlowe) + Release 0.23.5 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java?rev=1425071&r1=1425070&r2=1425071&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java Fri Dec 21 18:24:25 2012 @@ -58,7 +58,9 @@ import org.apache.hadoop.mapreduce.task. import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.ReflectionUtils; -@SuppressWarnings(value={"unchecked", "deprecation"}) +import com.google.common.annotations.VisibleForTesting; + +@SuppressWarnings(value={"unchecked"}) @InterfaceAudience.LimitedPrivate({"MapReduce"}) @InterfaceStability.Unstable public class MergeManager<K, V> { @@ -85,7 +87,7 @@ public class MergeManager<K, V> { Set<MapOutput<K, V>> inMemoryMapOutputs = new TreeSet<MapOutput<K,V>>(new MapOutputComparator<K, V>()); - private final InMemoryMerger inMemoryMerger; + private final MergeThread<MapOutput<K,V>, K,V> inMemoryMerger; Set<Path> onDiskMapOutputs = new TreeSet<Path>(); private final OnDiskMerger onDiskMerger; @@ -179,6 +181,8 @@ public class MergeManager<K, V> { + singleShuffleMemoryLimitPercent); } + usedMemory = 0L; + commitMemory = 0L; this.maxSingleShuffleLimit = (long)(memoryLimit * singleShuffleMemoryLimitPercent); this.memToMemMergeOutputsThreshold = @@ -210,7 +214,7 @@ public class MergeManager<K, V> { this.memToMemMerger = null; } - this.inMemoryMerger = new InMemoryMerger(this); + this.inMemoryMerger = createInMemoryMerger(); this.inMemoryMerger.start(); this.onDiskMerger = new OnDiskMerger(this); @@ -219,11 +223,19 @@ public class MergeManager<K, V> { this.mergePhase = mergePhase; } + protected MergeThread<MapOutput<K,V>, K,V> createInMemoryMerger() { + return new InMemoryMerger(this); + } TaskAttemptID getReduceId() { return reduceId; } + @VisibleForTesting + ExceptionReporter getExceptionReporter() { + return exceptionReporter; + } + public void waitForInMemoryMerge() throws InterruptedException { inMemoryMerger.waitForMerge(); } @@ -288,7 +300,6 @@ public class MergeManager<K, V> { } synchronized void unreserve(long size) { - commitMemory -= size; usedMemory -= size; } @@ -300,24 +311,20 @@ public class MergeManager<K, V> { commitMemory+= mapOutput.getSize(); - synchronized (inMemoryMerger) { - // Can hang if mergeThreshold is really low. - if (!inMemoryMerger.isInProgress() && commitMemory >= mergeThreshold) { - LOG.info("Starting inMemoryMerger's merge since commitMemory=" + - commitMemory + " > mergeThreshold=" + mergeThreshold + - ". Current usedMemory=" + usedMemory); - inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs); - inMemoryMergedMapOutputs.clear(); - inMemoryMerger.startMerge(inMemoryMapOutputs); - } + // Can hang if mergeThreshold is really low. + if (commitMemory >= mergeThreshold) { + LOG.info("Starting inMemoryMerger's merge since commitMemory=" + + commitMemory + " > mergeThreshold=" + mergeThreshold + + ". Current usedMemory=" + usedMemory); + inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs); + inMemoryMergedMapOutputs.clear(); + inMemoryMerger.startMerge(inMemoryMapOutputs); + commitMemory = 0L; // Reset commitMemory. } if (memToMemMerger != null) { - synchronized (memToMemMerger) { - if (!memToMemMerger.isInProgress() && - inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) { - memToMemMerger.startMerge(inMemoryMapOutputs); - } + if (inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) { + memToMemMerger.startMerge(inMemoryMapOutputs); } } } @@ -333,11 +340,8 @@ public class MergeManager<K, V> { public synchronized void closeOnDiskFile(Path file) { onDiskMapOutputs.add(file); - synchronized (onDiskMerger) { - if (!onDiskMerger.isInProgress() && - onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) { - onDiskMerger.startMerge(onDiskMapOutputs); - } + if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) { + onDiskMerger.startMerge(onDiskMapOutputs); } } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java?rev=1425071&r1=1425070&r2=1425071&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java Fri Dec 21 18:24:25 2012 @@ -20,8 +20,10 @@ package org.apache.hadoop.mapreduce.task import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -30,8 +32,8 @@ abstract class MergeThread<T,K,V> extend private static final Log LOG = LogFactory.getLog(MergeThread.class); - private volatile boolean inProgress = false; - private List<T> inputs = new ArrayList<T>(); + private AtomicInteger numPending = new AtomicInteger(0); + private LinkedList<List<T>> pendingToBeMerged; protected final MergeManager<K,V> manager; private final ExceptionReporter reporter; private boolean closed = false; @@ -39,6 +41,7 @@ abstract class MergeThread<T,K,V> extend public MergeThread(MergeManager<K,V> manager, int mergeFactor, ExceptionReporter reporter) { + this.pendingToBeMerged = new LinkedList<List<T>>(); this.manager = manager; this.mergeFactor = mergeFactor; this.reporter = reporter; @@ -50,53 +53,55 @@ abstract class MergeThread<T,K,V> extend interrupt(); } - public synchronized boolean isInProgress() { - return inProgress; - } - - public synchronized void startMerge(Set<T> inputs) { + public void startMerge(Set<T> inputs) { if (!closed) { - inProgress = true; - this.inputs = new ArrayList<T>(); + numPending.incrementAndGet(); + List<T> toMergeInputs = new ArrayList<T>(); Iterator<T> iter=inputs.iterator(); for (int ctr = 0; iter.hasNext() && ctr < mergeFactor; ++ctr) { - this.inputs.add(iter.next()); + toMergeInputs.add(iter.next()); iter.remove(); } - LOG.info(getName() + ": Starting merge with " + this.inputs.size() + + LOG.info(getName() + ": Starting merge with " + toMergeInputs.size() + " segments, while ignoring " + inputs.size() + " segments"); - notifyAll(); + synchronized(pendingToBeMerged) { + pendingToBeMerged.addLast(toMergeInputs); + pendingToBeMerged.notifyAll(); + } } } public synchronized void waitForMerge() throws InterruptedException { - while (inProgress) { + while (numPending.get() > 0) { wait(); } } public void run() { while (true) { + List<T> inputs = null; try { // Wait for notification to start the merge... - synchronized (this) { - while (!inProgress) { - wait(); + synchronized (pendingToBeMerged) { + while(pendingToBeMerged.size() <= 0) { + pendingToBeMerged.wait(); } + // Pickup the inputs to merge. + inputs = pendingToBeMerged.removeFirst(); } // Merge merge(inputs); } catch (InterruptedException ie) { + numPending.set(0); return; } catch(Throwable t) { + numPending.set(0); reporter.reportException(t); return; } finally { synchronized (this) { - // Clear inputs - inputs = null; - inProgress = false; + numPending.decrementAndGet(); notifyAll(); } } Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java?rev=1425071&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java Fri Dec 21 18:24:25 2012 @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.task.reduce; + +import static org.mockito.Mockito.mock; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.io.BoundedByteArrayOutputStream; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapOutputFile; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type; +import org.junit.Assert; +import org.junit.Test; + +public class TestMergeManager { + + @Test(timeout=10000) + public void testMemoryMerge() throws Exception { + final int TOTAL_MEM_BYTES = 10000; + final int OUTPUT_SIZE = 7950; + JobConf conf = new JobConf(); + conf.setFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 1.0f); + conf.setLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, TOTAL_MEM_BYTES); + conf.setFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, 0.8f); + conf.setFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT, 0.9f); + TestExceptionReporter reporter = new TestExceptionReporter(); + CyclicBarrier mergeStart = new CyclicBarrier(2); + CyclicBarrier mergeComplete = new CyclicBarrier(2); + StubbedMergeManager mgr = new StubbedMergeManager(conf, reporter, + mergeStart, mergeComplete); + + // reserve enough map output to cause a merge when it is committed + MapOutput<Text, Text> out1 = mgr.reserve(null, OUTPUT_SIZE, 0); + Assert.assertEquals("Should be a memory merge", + Type.MEMORY, out1.getType()); + fillOutput(out1); + MapOutput<Text, Text> out2 = mgr.reserve(null, OUTPUT_SIZE, 0); + Assert.assertEquals("Should be a memory merge", + Type.MEMORY, out2.getType()); + fillOutput(out2); + + // next reservation should be a WAIT + MapOutput<Text, Text> out3 = mgr.reserve(null, OUTPUT_SIZE, 0); + Assert.assertEquals("Should be told to wait", + Type.WAIT, out3.getType()); + + // trigger the first merge and wait for merge thread to start merging + // and free enough output to reserve more + out1.commit(); + out2.commit(); + mergeStart.await(); + + Assert.assertEquals(1, mgr.getNumMerges()); + + // reserve enough map output to cause another merge when committed + out1 = mgr.reserve(null, OUTPUT_SIZE, 0); + Assert.assertEquals("Should be a memory merge", + Type.MEMORY, out1.getType()); + fillOutput(out1); + out2 = mgr.reserve(null, OUTPUT_SIZE, 0); + Assert.assertEquals("Should be a memory merge", + Type.MEMORY, out2.getType()); + fillOutput(out2); + + // next reservation should be a WAIT + out3 = mgr.reserve(null, OUTPUT_SIZE, 0); + Assert.assertEquals("Should be told to wait", + Type.WAIT, out3.getType()); + + // commit output *before* merge thread completes + out1.commit(); + out2.commit(); + + // allow the first merge to complete + mergeComplete.await(); + + // start the second merge and verify + mergeStart.await(); + Assert.assertEquals(2, mgr.getNumMerges()); + + // trigger the end of the second merge + mergeComplete.await(); + + Assert.assertEquals(2, mgr.getNumMerges()); + Assert.assertEquals("exception reporter invoked", + 0, reporter.getNumExceptions()); + } + + private void fillOutput(MapOutput<Text, Text> output) throws IOException { + BoundedByteArrayOutputStream stream = output.getArrayStream(); + int count = stream.getLimit(); + for (int i=0; i < count; ++i) { + stream.write(i); + } + } + + private static class StubbedMergeManager extends MergeManager<Text, Text> { + private TestMergeThread mergeThread; + + public StubbedMergeManager(JobConf conf, ExceptionReporter reporter, + CyclicBarrier mergeStart, CyclicBarrier mergeComplete) { + super(null, conf, mock(LocalFileSystem.class), null, null, null, null, + null, null, null, null, reporter, null, mock(MapOutputFile.class)); + mergeThread.setSyncBarriers(mergeStart, mergeComplete); + } + + @Override + protected MergeThread<MapOutput<Text, Text>, Text, Text> createInMemoryMerger() { + mergeThread = new TestMergeThread(this, getExceptionReporter()); + return mergeThread; + } + + public int getNumMerges() { + return mergeThread.getNumMerges(); + } + } + + private static class TestMergeThread + extends MergeThread<MapOutput<Text,Text>, Text, Text> { + private AtomicInteger numMerges; + private CyclicBarrier mergeStart; + private CyclicBarrier mergeComplete; + + public TestMergeThread(MergeManager<Text, Text> mergeManager, + ExceptionReporter reporter) { + super(mergeManager, Integer.MAX_VALUE, reporter); + numMerges = new AtomicInteger(0); + } + + public synchronized void setSyncBarriers( + CyclicBarrier mergeStart, CyclicBarrier mergeComplete) { + this.mergeStart = mergeStart; + this.mergeComplete = mergeComplete; + } + + public int getNumMerges() { + return numMerges.get(); + } + + @Override + public void merge(List<MapOutput<Text, Text>> inputs) + throws IOException { + synchronized (this) { + numMerges.incrementAndGet(); + for (MapOutput<Text, Text> input : inputs) { + manager.unreserve(input.getSize()); + } + } + + try { + mergeStart.await(); + mergeComplete.await(); + } catch (InterruptedException e) { + } catch (BrokenBarrierException e) { + } + } + } + + private static class TestExceptionReporter implements ExceptionReporter { + private List<Throwable> exceptions = new ArrayList<Throwable>(); + + @Override + public void reportException(Throwable t) { + exceptions.add(t); + t.printStackTrace(); + } + + public int getNumExceptions() { + return exceptions.size(); + } + } +}