Author: shv Date: Mon Apr 23 22:26:51 2012 New Revision: 1329486 URL: http://svn.apache.org/viewvc?rev=1329486&view=rev Log: MAPREDUCE-4164. Fix Communication exception thrown after task completion. Contributed by Mayank Bansal.
Modified: hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/Task.java Modified: hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt?rev=1329486&r1=1329485&r2=1329486&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt Mon Apr 23 22:26:51 2012 @@ -23,6 +23,9 @@ Release 0.22.1 - Unreleased MAPREDUCE-3837. Job tracker is not able to recover jobs after crash. (Mayank Bansal via shv) + MAPREDUCE-4164. Fix Communication exception thrown after task completion. + (Mayank Bansal via shv) + Release 0.22.0 - 2011-11-29 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/Task.java?rev=1329486&r1=1329485&r2=1329486&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/Task.java (original) +++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/Task.java Mon Apr 23 22:26:51 2012 @@ -536,6 +536,8 @@ abstract public class Task implements Wr private InputSplit split = null; private Progress taskProgress; private Thread pingThread = null; + private boolean done = true; + private Object lock = new Object(); /** * flag that indicates whether progress update needs to be sent to parent. @@ -627,6 +629,9 @@ abstract public class Task implements Wr // get current flag value and reset it as well boolean sendProgress = resetProgressFlag(); while (!taskDone.get()) { + synchronized (lock) { + done = false; + } try { boolean taskFound = true; // whether TT knows about this task // sleep for a bit @@ -659,6 +664,7 @@ abstract public class Task implements Wr // came back up), kill ourselves if (!taskFound) { LOG.warn("Parent died. Exiting "+taskId); + resetDoneFlag(); System.exit(66); } @@ -671,11 +677,22 @@ abstract public class Task implements Wr if (remainingRetries == 0) { ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0); LOG.warn("Last retry, killing "+taskId); + resetDoneFlag(); System.exit(65); } } } + //Notify that we are done with the work + resetDoneFlag(); } + + void resetDoneFlag() { + synchronized (lock) { + done = true; + lock.notify(); + } + } + public void startCommunicationThread() { if (pingThread == null) { pingThread = new Thread(this, "communication thread"); @@ -685,6 +702,11 @@ abstract public class Task implements Wr } public void stopCommunicationThread() throws InterruptedException { if (pingThread != null) { + synchronized (lock) { + while (!done) { + lock.wait(); + } + } pingThread.interrupt(); pingThread.join(); }