Author: shv
Date: Mon Apr 23 22:26:51 2012
New Revision: 1329486

URL: http://svn.apache.org/viewvc?rev=1329486&view=rev
Log:
MAPREDUCE-4164. Fix Communication exception thrown after task completion. 
Contributed by Mayank Bansal.

Modified:
    hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt
    
hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/Task.java

Modified: hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt?rev=1329486&r1=1329485&r2=1329486&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt Mon Apr 23 
22:26:51 2012
@@ -23,6 +23,9 @@ Release 0.22.1 - Unreleased
     MAPREDUCE-3837. Job tracker is not able to recover jobs after crash.
     (Mayank Bansal via shv) 
 
+    MAPREDUCE-4164. Fix Communication exception thrown after task completion.
+    (Mayank Bansal via shv) 
+
 Release 0.22.0 - 2011-11-29
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/Task.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/Task.java?rev=1329486&r1=1329485&r2=1329486&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/Task.java
 (original)
+++ 
hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/Task.java
 Mon Apr 23 22:26:51 2012
@@ -536,6 +536,8 @@ abstract public class Task implements Wr
     private InputSplit split = null;
     private Progress taskProgress;
     private Thread pingThread = null;
+    private boolean done = true;
+    private Object lock = new Object();
 
     /**
      * flag that indicates whether progress update needs to be sent to parent.
@@ -627,6 +629,9 @@ abstract public class Task implements Wr
       // get current flag value and reset it as well
       boolean sendProgress = resetProgressFlag();
       while (!taskDone.get()) {
+         synchronized (lock) {
+          done = false;
+       }
         try {
           boolean taskFound = true; // whether TT knows about this task
           // sleep for a bit
@@ -659,6 +664,7 @@ abstract public class Task implements Wr
           // came back up), kill ourselves
           if (!taskFound) {
             LOG.warn("Parent died.  Exiting "+taskId);
+            resetDoneFlag();
             System.exit(66);
           }
 
@@ -671,11 +677,22 @@ abstract public class Task implements Wr
           if (remainingRetries == 0) {
             ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
             LOG.warn("Last retry, killing "+taskId);
+            resetDoneFlag();
             System.exit(65);
           }
         }
       }
+      //Notify that we are done with the work
+      resetDoneFlag();
     }
+    
+    void resetDoneFlag() {
+      synchronized (lock) {
+        done = true;
+        lock.notify();
+      }
+    }
+    
     public void startCommunicationThread() {
       if (pingThread == null) {
         pingThread = new Thread(this, "communication thread");
@@ -685,6 +702,11 @@ abstract public class Task implements Wr
     }
     public void stopCommunicationThread() throws InterruptedException {
       if (pingThread != null) {
+        synchronized (lock) {
+          while (!done) {
+            lock.wait();
+          }
+        }
         pingThread.interrupt();
         pingThread.join();
       }


Reply via email to