Author: tgraves
Date: Fri Aug 31 20:43:46 2012
New Revision: 1379599

URL: http://svn.apache.org/viewvc?rev=1379599&view=rev
Log:
MAPREDUCE-4611. MR AM dies badly when Node is decommissioned (Robert Evans via 
tgraves)

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1379599&r1=1379598&r2=1379599&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Fri Aug 31 
20:43:46 2012
@@ -858,6 +858,9 @@ Release 0.23.3 - UNRELEASED
 
     MAPREDUCE-4614. Simplify debugging a job's tokens (daryn via bobby)
 
+    MAPREDUCE-4611. MR AM dies badly when Node is decommissioned (Robert
+    Evans via tgraves)
+
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1379599&r1=1379598&r2=1379599&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
 Fri Aug 31 20:43:46 2012
@@ -99,8 +99,8 @@ public class JobHistoryEventHandler exte
   protected static final Map<JobId, MetaInfo> fileMap =
     Collections.<JobId,MetaInfo>synchronizedMap(new HashMap<JobId,MetaInfo>());
 
-  // Has a signal (SIGTERM etc) been issued?
-  protected volatile boolean isSignalled = false;
+  // should job completion be force when the AM shuts down?
+  protected volatile boolean forceJobCompletion = false;
 
   public JobHistoryEventHandler(AppContext context, int startCount) {
     super("JobHistoryEventHandler");
@@ -322,7 +322,7 @@ public class JobHistoryEventHandler exte
     // Process JobUnsuccessfulCompletionEvent for jobIds which still haven't
     // closed their event writers
     Iterator<JobId> jobIt = fileMap.keySet().iterator();
-    if(isSignalled) {
+    if(forceJobCompletion) {
       while (jobIt.hasNext()) {
         JobId toClose = jobIt.next();
         MetaInfo mi = fileMap.get(toClose);
@@ -911,9 +911,9 @@ public class JobHistoryEventHandler exte
     return tmpFileName.substring(0, tmpFileName.length()-4);
   }
 
-  public void setSignalled(boolean isSignalled) {
-    this.isSignalled = isSignalled;
-    LOG.info("JobHistoryEventHandler notified that isSignalled was "
-      + isSignalled);
+  public void setForcejobCompletion(boolean forceJobCompletion) {
+    this.forceJobCompletion = forceJobCompletion;
+    LOG.info("JobHistoryEventHandler notified that forceJobCompletion is "
+      + forceJobCompletion);
   }
 }

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1379599&r1=1379598&r2=1379599&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
 Fri Aug 31 20:43:46 2012
@@ -170,6 +170,8 @@ public class MRAppMaster extends Composi
   private Credentials fsTokens = new Credentials(); // Filled during init
   private UserGroupInformation currentUser; // Will be setup during init
 
+  private volatile boolean isLastAMRetry = false;
+
   public MRAppMaster(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
       long appSubmitTime) {
@@ -195,11 +197,21 @@ public class MRAppMaster extends Composi
 
   @Override
   public void init(final Configuration conf) {
-
     conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
 
     downloadTokensAndSetupUGI(conf);
-
+    
+    //TODO this is a hack, we really need the RM to inform us when we
+    // are the last one.  This would allow us to configure retries on
+    // a per application basis.
+    int numAMRetries = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, 
+        YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES);
+    isLastAMRetry = appAttemptID.getAttemptId() >= numAMRetries;
+    LOG.info("AM Retries: " + numAMRetries + 
+        " attempt num: " + appAttemptID.getAttemptId() +
+        " is last retry: " + isLastAMRetry);
+    
+    
     context = new RunningAppContext(conf);
 
     // Job name is the same as the app name util we support DAG of jobs
@@ -417,6 +429,8 @@ public class MRAppMaster extends Composi
       }
 
       try {
+        //We are finishing cleanly so this is the last retry
+        isLastAMRetry = true;
         // Stop all services
         // This will also send the final report to the ResourceManager
         LOG.info("Calling stop for all the services");
@@ -666,7 +680,11 @@ public class MRAppMaster extends Composi
     }
 
     public void setSignalled(boolean isSignalled) {
-      ((RMCommunicator) containerAllocator).setSignalled(true);
+      ((RMCommunicator) containerAllocator).setSignalled(isSignalled);
+    }
+    
+    public void setShouldUnregister(boolean shouldUnregister) {
+      ((RMCommunicator) 
containerAllocator).setShouldUnregister(shouldUnregister);
     }
   }
 
@@ -717,7 +735,12 @@ public class MRAppMaster extends Composi
     @Override
     public synchronized void stop() {
       try {
-        cleanupStagingDir();
+        if(isLastAMRetry) {
+          cleanupStagingDir();
+        } else {
+          LOG.info("Skipping cleaning up the staging dir. "
+              + "assuming AM will be retried.");
+        }
       } catch (IOException io) {
         LOG.error("Failed to cleanup staging dir: ", io);
       }
@@ -1016,14 +1039,19 @@ public class MRAppMaster extends Composi
     public void run() {
       LOG.info("MRAppMaster received a signal. Signaling RMCommunicator and "
         + "JobHistoryEventHandler.");
+
       // Notify the JHEH and RMCommunicator that a SIGTERM has been received so
       // that they don't take too long in shutting down
       if(appMaster.containerAllocator instanceof ContainerAllocatorRouter) {
         ((ContainerAllocatorRouter) appMaster.containerAllocator)
         .setSignalled(true);
+        ((ContainerAllocatorRouter) appMaster.containerAllocator)
+        .setShouldUnregister(appMaster.isLastAMRetry);
       }
+      
       if(appMaster.jobHistoryEventHandler != null) {
-        appMaster.jobHistoryEventHandler.setSignalled(true);
+        appMaster.jobHistoryEventHandler
+          .setForcejobCompletion(appMaster.isLastAMRetry);
       }
       appMaster.stop();
     }

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1379599&r1=1379598&r2=1379599&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
 Fri Aug 31 20:43:46 2012
@@ -84,6 +84,7 @@ public abstract class RMCommunicator ext
   private Job job;
   // Has a signal (SIGTERM etc) been issued?
   protected volatile boolean isSignalled = false;
+  private volatile boolean shouldUnregister = true;
 
   public RMCommunicator(ClientService clientService, AppContext context) {
     super("RMCommunicator");
@@ -213,7 +214,9 @@ public abstract class RMCommunicator ext
     } catch (InterruptedException ie) {
       LOG.warn("InterruptedException while stopping", ie);
     }
-    unregister();
+    if(shouldUnregister) {
+      unregister();
+    }
     super.stop();
   }
 
@@ -288,8 +291,15 @@ public abstract class RMCommunicator ext
 
   protected abstract void heartbeat() throws Exception;
 
+  public void setShouldUnregister(boolean shouldUnregister) {
+    this.shouldUnregister = shouldUnregister;
+    LOG.info("RMCommunicator notified that shouldUnregistered is: " 
+        + shouldUnregister);
+  }
+  
   public void setSignalled(boolean isSignalled) {
     this.isSignalled = isSignalled;
-    LOG.info("RMCommunicator notified that iSignalled was : " + isSignalled);
+    LOG.info("RMCommunicator notified that iSignalled is: " 
+        + isSignalled);
   }
 }

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java?rev=1379599&r1=1379598&r2=1379599&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
 Fri Aug 31 20:43:46 2012
@@ -330,7 +330,7 @@ public class TestJobHistoryEventHandler 
     Mockito.when(jobId.getAppId()).thenReturn(mockAppId);
 
     jheh.addToFileMap(jobId);
-    jheh.setSignalled(true);
+    jheh.setForcejobCompletion(true);
     for(int i=0; i < numEvents; ++i) {
       events[i] = getEventToEnqueue(jobId);
       jheh.handle(events[i]);

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1379599&r1=1379598&r2=1379599&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
 Fri Aug 31 20:43:46 2012
@@ -23,6 +23,7 @@ import static org.mockito.Matchers.anyBo
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
 
 import java.io.IOException;
 
@@ -47,6 +48,7 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -89,28 +91,94 @@ import org.junit.Test;
      handler.handle(new JobFinishEvent(jobid));
      verify(fs).delete(stagingJobPath, true);
    }
+   
+   @Test
+   public void testDeletionofStagingOnKill() throws IOException {
+     conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+     conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 4);
+     fs = mock(FileSystem.class);
+     when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+     ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
+         ApplicationAttemptId.class);
+     attemptId.setAttemptId(0);
+     ApplicationId appId = 
recordFactory.newRecordInstance(ApplicationId.class);
+     appId.setClusterTimestamp(System.currentTimeMillis());
+     appId.setId(0);
+     attemptId.setApplicationId(appId);
+     JobId jobid = recordFactory.newRecordInstance(JobId.class);
+     jobid.setAppId(appId);
+     ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc);
+     appMaster.init(conf);
+     //simulate the process being killed
+     MRAppMaster.MRAppMasterShutdownHook hook = 
+       new MRAppMaster.MRAppMasterShutdownHook(appMaster);
+     hook.run();
+     verify(fs, times(0)).delete(stagingJobPath, true);
+   }
+   
+   @Test
+   public void testDeletionofStagingOnKillLastTry() throws IOException {
+     conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+     conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 1);
+     fs = mock(FileSystem.class);
+     when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+     ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
+         ApplicationAttemptId.class);
+     attemptId.setAttemptId(1);
+     ApplicationId appId = 
recordFactory.newRecordInstance(ApplicationId.class);
+     appId.setClusterTimestamp(System.currentTimeMillis());
+     appId.setId(0);
+     attemptId.setApplicationId(appId);
+     JobId jobid = recordFactory.newRecordInstance(JobId.class);
+     jobid.setAppId(appId);
+     ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc);
+     appMaster.init(conf);
+     //simulate the process being killed
+     MRAppMaster.MRAppMasterShutdownHook hook = 
+       new MRAppMaster.MRAppMasterShutdownHook(appMaster);
+     hook.run();
+     verify(fs).delete(stagingJobPath, true);
+   }
 
    private class TestMRApp extends MRAppMaster {
+     ContainerAllocator allocator;
 
-    public TestMRApp(ApplicationAttemptId applicationAttemptId) {
-      super(applicationAttemptId, BuilderUtils.newContainerId(
-          applicationAttemptId, 1), "testhost", 2222, 3333, System
-          .currentTimeMillis());
-    }
-     
-    @Override
-    protected FileSystem getFileSystem(Configuration conf) {
-      return fs;
-    }
-    
-    @Override
-    protected void sysexit() {      
-    }
-    
-    @Override
-    public Configuration getConfig() {
-      return conf;
-    }
+     public TestMRApp(ApplicationAttemptId applicationAttemptId, 
+         ContainerAllocator allocator) {
+       super(applicationAttemptId, BuilderUtils.newContainerId(
+           applicationAttemptId, 1), "testhost", 2222, 3333, System
+           .currentTimeMillis());
+       this.allocator = allocator;
+     }
+
+     public TestMRApp(ApplicationAttemptId applicationAttemptId) {
+       this(applicationAttemptId, null);
+     }
+
+     @Override
+     protected FileSystem getFileSystem(Configuration conf) {
+       return fs;
+     }
+
+     @Override
+     protected ContainerAllocator createContainerAllocator(
+         final ClientService clientService, final AppContext context) {
+       if(allocator == null) {
+         return super.createContainerAllocator(clientService, context);
+       }
+       return allocator;
+     }
+
+     @Override
+     protected void sysexit() {      
+     }
+
+     @Override
+     public Configuration getConfig() {
+       return conf;
+     }
    }
 
   private final class MRAppTestCleanup extends MRApp {


Reply via email to