Author: jlowe Date: Fri Dec 21 18:32:57 2012 New Revision: 1425074 URL: http://svn.apache.org/viewvc?rev=1425074&view=rev Log: svn merge -c 1425071 FIXES: MAPREDUCE-4842. Shuffle race can hang reducer. Contributed by Mariappan Asokan
Added: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java - copied unchanged from r1425071, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1425074&r1=1425073&r2=1425074&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Fri Dec 21 18:32:57 2012 @@ -473,6 +473,8 @@ Release 0.23.6 - UNRELEASED MAPREDUCE-4836. Elapsed time for running tasks on AM web UI tasks page is 0 (Ravi Prakash via jeagles) + MAPREDUCE-4842. Shuffle race can hang reducer (Mariappan Asokan via jlowe) + Release 0.23.5 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java?rev=1425074&r1=1425073&r2=1425074&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java Fri Dec 21 18:32:57 2012 @@ -58,7 +58,9 @@ import org.apache.hadoop.mapreduce.task. import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.ReflectionUtils; -@SuppressWarnings(value={"unchecked", "deprecation"}) +import com.google.common.annotations.VisibleForTesting; + +@SuppressWarnings(value={"unchecked"}) @InterfaceAudience.Private @InterfaceStability.Unstable public class MergeManager<K, V> { @@ -85,7 +87,7 @@ public class MergeManager<K, V> { Set<MapOutput<K, V>> inMemoryMapOutputs = new TreeSet<MapOutput<K,V>>(new MapOutputComparator<K, V>()); - private final InMemoryMerger inMemoryMerger; + private final MergeThread<MapOutput<K,V>, K,V> inMemoryMerger; Set<Path> onDiskMapOutputs = new TreeSet<Path>(); private final OnDiskMerger onDiskMerger; @@ -179,6 +181,8 @@ public class MergeManager<K, V> { + singleShuffleMemoryLimitPercent); } + usedMemory = 0L; + commitMemory = 0L; this.maxSingleShuffleLimit = (long)(memoryLimit * singleShuffleMemoryLimitPercent); this.memToMemMergeOutputsThreshold = @@ -210,7 +214,7 @@ public class MergeManager<K, V> { this.memToMemMerger = null; } - this.inMemoryMerger = new InMemoryMerger(this); + this.inMemoryMerger = createInMemoryMerger(); this.inMemoryMerger.start(); this.onDiskMerger = new OnDiskMerger(this); @@ -219,11 +223,19 @@ public class MergeManager<K, V> { this.mergePhase = mergePhase; } + protected MergeThread<MapOutput<K,V>, K,V> createInMemoryMerger() { + return new InMemoryMerger(this); + } TaskAttemptID getReduceId() { return reduceId; } + @VisibleForTesting + ExceptionReporter getExceptionReporter() { + return exceptionReporter; + } + public void waitForInMemoryMerge() throws InterruptedException { inMemoryMerger.waitForMerge(); } @@ -288,7 +300,6 @@ public class MergeManager<K, V> { } synchronized void unreserve(long size) { - commitMemory -= size; usedMemory -= size; } @@ -300,24 +311,20 @@ public class MergeManager<K, V> { commitMemory+= mapOutput.getSize(); - synchronized (inMemoryMerger) { - // Can hang if mergeThreshold is really low. - if (!inMemoryMerger.isInProgress() && commitMemory >= mergeThreshold) { - LOG.info("Starting inMemoryMerger's merge since commitMemory=" + - commitMemory + " > mergeThreshold=" + mergeThreshold + - ". Current usedMemory=" + usedMemory); - inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs); - inMemoryMergedMapOutputs.clear(); - inMemoryMerger.startMerge(inMemoryMapOutputs); - } + // Can hang if mergeThreshold is really low. + if (commitMemory >= mergeThreshold) { + LOG.info("Starting inMemoryMerger's merge since commitMemory=" + + commitMemory + " > mergeThreshold=" + mergeThreshold + + ". Current usedMemory=" + usedMemory); + inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs); + inMemoryMergedMapOutputs.clear(); + inMemoryMerger.startMerge(inMemoryMapOutputs); + commitMemory = 0L; // Reset commitMemory. } if (memToMemMerger != null) { - synchronized (memToMemMerger) { - if (!memToMemMerger.isInProgress() && - inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) { - memToMemMerger.startMerge(inMemoryMapOutputs); - } + if (inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) { + memToMemMerger.startMerge(inMemoryMapOutputs); } } } @@ -333,11 +340,8 @@ public class MergeManager<K, V> { public synchronized void closeOnDiskFile(Path file) { onDiskMapOutputs.add(file); - synchronized (onDiskMerger) { - if (!onDiskMerger.isInProgress() && - onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) { - onDiskMerger.startMerge(onDiskMapOutputs); - } + if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) { + onDiskMerger.startMerge(onDiskMapOutputs); } } Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java?rev=1425074&r1=1425073&r2=1425074&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java Fri Dec 21 18:32:57 2012 @@ -20,8 +20,10 @@ package org.apache.hadoop.mapreduce.task import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -30,8 +32,8 @@ abstract class MergeThread<T,K,V> extend private static final Log LOG = LogFactory.getLog(MergeThread.class); - private volatile boolean inProgress = false; - private List<T> inputs = new ArrayList<T>(); + private AtomicInteger numPending = new AtomicInteger(0); + private LinkedList<List<T>> pendingToBeMerged; protected final MergeManager<K,V> manager; private final ExceptionReporter reporter; private boolean closed = false; @@ -39,6 +41,7 @@ abstract class MergeThread<T,K,V> extend public MergeThread(MergeManager<K,V> manager, int mergeFactor, ExceptionReporter reporter) { + this.pendingToBeMerged = new LinkedList<List<T>>(); this.manager = manager; this.mergeFactor = mergeFactor; this.reporter = reporter; @@ -50,53 +53,55 @@ abstract class MergeThread<T,K,V> extend interrupt(); } - public synchronized boolean isInProgress() { - return inProgress; - } - - public synchronized void startMerge(Set<T> inputs) { + public void startMerge(Set<T> inputs) { if (!closed) { - inProgress = true; - this.inputs = new ArrayList<T>(); + numPending.incrementAndGet(); + List<T> toMergeInputs = new ArrayList<T>(); Iterator<T> iter=inputs.iterator(); for (int ctr = 0; iter.hasNext() && ctr < mergeFactor; ++ctr) { - this.inputs.add(iter.next()); + toMergeInputs.add(iter.next()); iter.remove(); } - LOG.info(getName() + ": Starting merge with " + this.inputs.size() + + LOG.info(getName() + ": Starting merge with " + toMergeInputs.size() + " segments, while ignoring " + inputs.size() + " segments"); - notifyAll(); + synchronized(pendingToBeMerged) { + pendingToBeMerged.addLast(toMergeInputs); + pendingToBeMerged.notifyAll(); + } } } public synchronized void waitForMerge() throws InterruptedException { - while (inProgress) { + while (numPending.get() > 0) { wait(); } } public void run() { while (true) { + List<T> inputs = null; try { // Wait for notification to start the merge... - synchronized (this) { - while (!inProgress) { - wait(); + synchronized (pendingToBeMerged) { + while(pendingToBeMerged.size() <= 0) { + pendingToBeMerged.wait(); } + // Pickup the inputs to merge. + inputs = pendingToBeMerged.removeFirst(); } // Merge merge(inputs); } catch (InterruptedException ie) { + numPending.set(0); return; } catch(Throwable t) { + numPending.set(0); reporter.reportException(t); return; } finally { synchronized (this) { - // Clear inputs - inputs = null; - inProgress = false; + numPending.decrementAndGet(); notifyAll(); } }