Author: vinodkv
Date: Sun Oct  6 20:53:28 2013
New Revision: 1529682

URL: http://svn.apache.org/r1529682
Log:
MAPREDUCE-5562. Fixed MR App Master to perform pending tasks like staging-dir 
cleanup, sending job-end notification correctly when unregister with RM fails. 
Contributed by Zhijie Shen.

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1529682&r1=1529681&r2=1529682&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Sun Oct  6 
20:53:28 2013
@@ -282,6 +282,10 @@ Release 2.1.2 - UNRELEASED
     aren't heart-beating for a while, so that we can aggressively speculate
     instead of waiting for task-timeout (Xuan Gong via vinodkv)
 
+    MAPREDUCE-5562. Fixed MR App Master to perform pending tasks like 
staging-dir
+    cleanup, sending job-end notification correctly when unregister with RM
+    fails. (Zhijie Shen via vinodkv)
+
 Release 2.1.1-beta - 2013-09-23
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java?rev=1529682&r1=1529681&r2=1529682&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
 Sun Oct  6 20:53:28 2013
@@ -64,6 +64,6 @@ public interface AppContext {
 
   boolean isLastAMRetry();
 
-  boolean safeToReportTerminationToUser();
+  boolean hasSuccessfullyUnregistered();
 
 }

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1529682&r1=1529681&r2=1529682&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
 Sun Oct  6 20:53:28 2013
@@ -18,7 +18,21 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -27,20 +41,37 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.http.HttpConfig;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.LocalContainerLauncher;
+import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
+import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.jobhistory.*;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.EventReader;
+import org.apache.hadoop.mapreduce.jobhistory.EventType;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryCopyService;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.apache.hadoop.mapreduce.v2.api.records.*;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
@@ -51,14 +82,26 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
-import org.apache.hadoop.mapreduce.v2.app.job.event.*;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
 import org.apache.hadoop.mapreduce.v2.app.local.LocalContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
-import org.apache.hadoop.mapreduce.v2.app.rm.*;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
@@ -95,14 +138,7 @@ import org.apache.hadoop.yarn.util.Clock
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.SystemClock;
 
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.security.PrivilegedExceptionAction;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * The Map-Reduce Application Master.
@@ -166,7 +202,8 @@ public class MRAppMaster extends Composi
   private Credentials jobCredentials = new Credentials(); // Filled during init
   protected UserGroupInformation currentUser; // Will be setup during init
 
-  private volatile boolean isLastAMRetry = false;
+  @VisibleForTesting
+  protected volatile boolean isLastAMRetry = false;
   //Something happened and we should shut down right after we start up.
   boolean errorHappenedShutDown = false;
   private String shutDownMessage = null;
@@ -175,7 +212,7 @@ public class MRAppMaster extends Composi
   private long recoveredJobStartTime = 0;
 
   @VisibleForTesting
-  protected AtomicBoolean safeToReportTerminationToUser =
+  protected AtomicBoolean successfullyUnregistered =
       new AtomicBoolean(false);
 
   public MRAppMaster(ApplicationAttemptId applicationAttemptId,
@@ -208,14 +245,14 @@ public class MRAppMaster extends Composi
 
     initJobCredentialsAndUGI(conf);
 
-    isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts;
+    context = new RunningAppContext(conf);
+
+    ((RunningAppContext)context).computeIsLastAMRetry();
     LOG.info("The specific max attempts: " + maxAppAttempts +
         " for application: " + appAttemptID.getApplicationId().getId() +
         ". Attempt num: " + appAttemptID.getAttemptId() +
         " is last retry: " + isLastAMRetry);
 
-    context = new RunningAppContext(conf);
-
     // Job name is the same as the app name util we support DAG of jobs
     // for an app later
     appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>");
@@ -511,11 +548,6 @@ public class MRAppMaster extends Composi
       MRAppMaster.this.stop();
 
       if (isLastAMRetry) {
-        // Except ClientService, other services are already stopped, it is 
safe to
-        // let clients know the final states. ClientService should wait for 
some
-        // time so clients have enough time to know the final states.
-        safeToReportTerminationToUser.set(true);
-
         // Send job-end notification when it is safe to report termination to
         // users and it is the last AM retry
         if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
@@ -524,7 +556,14 @@ public class MRAppMaster extends Composi
                 + job.getReport().getJobId());
             JobEndNotifier notifier = new JobEndNotifier();
             notifier.setConf(getConfig());
-            notifier.notify(job.getReport());
+            JobReport report = job.getReport();
+            // If unregistration fails, the final state is unavailable. 
However,
+            // at the last AM Retry, the client will finally be notified FAILED
+            // from RM, so we should let users know FAILED via notifier as well
+            if (!context.hasSuccessfullyUnregistered()) {
+              report.setJobState(JobState.FAILED);
+            }
+            notifier.notify(report);
           } catch (InterruptedException ie) {
             LOG.warn("Job end notification interrupted for jobID : "
                 + job.getReport().getJobId(), ie);
@@ -863,7 +902,7 @@ public class MRAppMaster extends Composi
     }
   }
 
-  private class RunningAppContext implements AppContext {
+  public class RunningAppContext implements AppContext {
 
     private final Map<JobId, Job> jobs = new ConcurrentHashMap<JobId, Job>();
     private final Configuration conf;
@@ -942,8 +981,16 @@ public class MRAppMaster extends Composi
     }
 
     @Override
-    public boolean safeToReportTerminationToUser() {
-      return safeToReportTerminationToUser.get();
+    public boolean hasSuccessfullyUnregistered() {
+      return successfullyUnregistered.get();
+    }
+
+    public void markSuccessfulUnregistration() {
+      successfullyUnregistered.set(true);
+    }
+
+    public void computeIsLastAMRetry() {
+      isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts;
     }
   }
 

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1529682&r1=1529681&r2=1529682&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
 Sun Oct  6 20:53:28 2013
@@ -128,8 +128,6 @@ import org.apache.hadoop.yarn.state.Stat
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /** Implementation of Job interface. Maintains the state machines of Job.
  * The read and write calls use ReadWriteLock for concurrency.
  */
@@ -933,7 +931,7 @@ public class JobImpl implements org.apac
     readLock.lock();
     try {
       JobState state = getExternalState(getInternalState());
-      if (!appContext.safeToReportTerminationToUser()
+      if (!appContext.hasSuccessfullyUnregistered()
           && (state == JobState.SUCCEEDED || state == JobState.FAILED
           || state == JobState.KILLED || state == JobState.ERROR)) {
         return lastNonFinalState;

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1529682&r1=1529681&r2=1529682&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
 Sun Oct  6 20:53:28 2013
@@ -29,11 +29,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster.RunningAppContext;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
@@ -52,10 +52,13 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Registers/unregisters to RM and sends heartbeats to RM.
  */
@@ -171,41 +174,57 @@ public abstract class RMCommunicator ext
 
   protected void unregister() {
     try {
-      FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
-      JobImpl jobImpl = (JobImpl)job;
-      if (jobImpl.getInternalState() == JobStateInternal.SUCCEEDED) {
-        finishState = FinalApplicationStatus.SUCCEEDED;
-      } else if (jobImpl.getInternalState() == JobStateInternal.KILLED
-          || (jobImpl.getInternalState() == JobStateInternal.RUNNING && 
isSignalled)) {
-        finishState = FinalApplicationStatus.KILLED;
-      } else if (jobImpl.getInternalState() == JobStateInternal.FAILED
-          || jobImpl.getInternalState() == JobStateInternal.ERROR) {
-        finishState = FinalApplicationStatus.FAILED;
-      }
-      StringBuffer sb = new StringBuffer();
-      for (String s : job.getDiagnostics()) {
-        sb.append(s).append("\n");
-      }
-      LOG.info("Setting job diagnostics to " + sb.toString());
-
-      String historyUrl =
-          MRWebAppUtil.getApplicationWebURLOnJHSWithScheme(getConfig(),
-              context.getApplicationID());
-      LOG.info("History url is " + historyUrl);
-      FinishApplicationMasterRequest request =
-          FinishApplicationMasterRequest.newInstance(finishState,
-            sb.toString(), historyUrl);
-      while (true) {
-        FinishApplicationMasterResponse response =
-            scheduler.finishApplicationMaster(request);
-        if (response.getIsUnregistered()) {
-          break;
-        }
-        LOG.info("Waiting for application to be successfully unregistered.");
-        Thread.sleep(rmPollInterval);
-      }
+      doUnregistration();
     } catch(Exception are) {
       LOG.error("Exception while unregistering ", are);
+      // if unregistration failed, isLastAMRetry needs to be recalculated
+      // to see whether AM really has the chance to retry
+      RunningAppContext raContext = (RunningAppContext) context;
+      raContext.computeIsLastAMRetry();
+    }
+  }
+
+  @VisibleForTesting
+  protected void doUnregistration()
+      throws YarnException, IOException, InterruptedException {
+    FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
+    JobImpl jobImpl = (JobImpl)job;
+    if (jobImpl.getInternalState() == JobStateInternal.SUCCEEDED) {
+      finishState = FinalApplicationStatus.SUCCEEDED;
+    } else if (jobImpl.getInternalState() == JobStateInternal.KILLED
+        || (jobImpl.getInternalState() == JobStateInternal.RUNNING && 
isSignalled)) {
+      finishState = FinalApplicationStatus.KILLED;
+    } else if (jobImpl.getInternalState() == JobStateInternal.FAILED
+        || jobImpl.getInternalState() == JobStateInternal.ERROR) {
+      finishState = FinalApplicationStatus.FAILED;
+    }
+    StringBuffer sb = new StringBuffer();
+    for (String s : job.getDiagnostics()) {
+      sb.append(s).append("\n");
+    }
+    LOG.info("Setting job diagnostics to " + sb.toString());
+
+    String historyUrl =
+        MRWebAppUtil.getApplicationWebURLOnJHSWithScheme(getConfig(),
+            context.getApplicationID());
+    LOG.info("History url is " + historyUrl);
+    FinishApplicationMasterRequest request =
+        FinishApplicationMasterRequest.newInstance(finishState,
+          sb.toString(), historyUrl);
+    while (true) {
+      FinishApplicationMasterResponse response =
+          scheduler.finishApplicationMaster(request);
+      if (response.getIsUnregistered()) {
+        // When excepting ClientService, other services are already stopped,
+        // it is safe to let clients know the final states. ClientService
+        // should wait for some time so clients have enough time to know the
+        // final states.
+        RunningAppContext raContext = (RunningAppContext) context;
+        raContext.markSuccessfulUnregistration();
+        break;
+      }
+      LOG.info("Waiting for application to be successfully unregistered.");
+      Thread.sleep(rmPollInterval);
     }
   }
 
@@ -235,7 +254,6 @@ public abstract class RMCommunicator ext
 
   protected void startAllocatorThread() {
     allocatorThread = new Thread(new Runnable() {
-      @SuppressWarnings("unchecked")
       @Override
       public void run() {
         while (!stopped.get() && !Thread.currentThread().isInterrupted()) {

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1529682&r1=1529681&r2=1529682&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
 Sun Oct  6 20:53:28 2013
@@ -136,9 +136,9 @@ public class MRApp extends MRAppMaster {
   }
 
   public MRApp(int maps, int reduces, boolean autoComplete, String testName,
-      boolean cleanOnStart, Clock clock, boolean shutdown) {
+      boolean cleanOnStart, Clock clock, boolean unregistered) {
     this(maps, reduces, autoComplete, testName, cleanOnStart, 1, clock,
-        shutdown);
+        unregistered);
   }
 
   public MRApp(int maps, int reduces, boolean autoComplete, String testName,
@@ -147,8 +147,8 @@ public class MRApp extends MRAppMaster {
   }
 
   public MRApp(int maps, int reduces, boolean autoComplete, String testName,
-      boolean cleanOnStart, boolean shutdown) {
-    this(maps, reduces, autoComplete, testName, cleanOnStart, 1, shutdown);
+      boolean cleanOnStart, boolean unregistered) {
+    this(maps, reduces, autoComplete, testName, cleanOnStart, 1, unregistered);
   }
 
   @Override
@@ -181,16 +181,16 @@ public class MRApp extends MRAppMaster {
   }
 
   public MRApp(int maps, int reduces, boolean autoComplete, String testName,
-      boolean cleanOnStart, int startCount, boolean shutdown) {
+      boolean cleanOnStart, int startCount, boolean unregistered) {
     this(maps, reduces, autoComplete, testName, cleanOnStart, startCount,
-        new SystemClock(), shutdown);
+        new SystemClock(), unregistered);
   }
 
   public MRApp(int maps, int reduces, boolean autoComplete, String testName,
-      boolean cleanOnStart, int startCount, Clock clock, boolean shutdown) {
+      boolean cleanOnStart, int startCount, Clock clock, boolean unregistered) 
{
     this(getApplicationAttemptId(applicationId, startCount), getContainerId(
       applicationId, startCount), maps, reduces, autoComplete, testName,
-      cleanOnStart, startCount, clock, shutdown);
+      cleanOnStart, startCount, clock, unregistered);
   }
 
   public MRApp(int maps, int reduces, boolean autoComplete, String testName,
@@ -202,9 +202,9 @@ public class MRApp extends MRAppMaster {
 
   public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
       int maps, int reduces, boolean autoComplete, String testName,
-      boolean cleanOnStart, int startCount, boolean shutdown) {
+      boolean cleanOnStart, int startCount, boolean unregistered) {
     this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName,
-        cleanOnStart, startCount, new SystemClock(), shutdown);
+        cleanOnStart, startCount, new SystemClock(), unregistered);
   }
 
   public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
@@ -216,7 +216,7 @@ public class MRApp extends MRAppMaster {
 
   public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
       int maps, int reduces, boolean autoComplete, String testName,
-      boolean cleanOnStart, int startCount, Clock clock, boolean shutdown) {
+      boolean cleanOnStart, int startCount, Clock clock, boolean unregistered) 
{
     super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, 
System
         .currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
     this.testWorkDir = new File("target", testName);
@@ -237,7 +237,7 @@ public class MRApp extends MRAppMaster {
     this.autoComplete = autoComplete;
     // If safeToReportTerminationToUser is set to true, we can verify whether
     // the job can reaches the final state when MRAppMaster shuts down.
-    this.safeToReportTerminationToUser.set(shutdown);
+    this.successfullyUnregistered.set(unregistered);
   }
 
   @Override

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java?rev=1529682&r1=1529681&r2=1529682&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
 Sun Oct  6 20:53:28 2013
@@ -137,7 +137,7 @@ public class MockAppContext implements A
   }
 
   @Override
-  public boolean safeToReportTerminationToUser() {
+  public boolean hasSuccessfullyUnregistered() {
     // bogus - Not Required
     return true;
   }

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java?rev=1529682&r1=1529681&r2=1529682&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java
 Sun Oct  6 20:53:28 2013
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.a
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 import java.io.File;
 import java.io.IOException;
@@ -41,10 +42,16 @@ import org.apache.hadoop.mapred.JobConte
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -185,25 +192,19 @@ public class TestJobEndNotifier extends 
   }
 
   @Test
-  public void testNotificationOnNormalShutdown() throws Exception {
+  public void testNotificationOnLastRetryNormalShutdown() throws Exception {
     HttpServer server = startHttpServer();
     // Act like it is the second attempt. Default max attempts is 2
-    MRApp app = spy(new MRApp(2, 2, true, this.getClass().getName(), true, 2));
-    // Make use of safeToReportflag so that we can look at final job-state as
-    // seen by real users.
-    app.safeToReportTerminationToUser.set(false);
+    MRApp app = spy(new MRAppWithCustomContainerAllocator(
+        2, 2, true, this.getClass().getName(), true, 2, true));
     doNothing().when(app).sysexit();
     Configuration conf = new Configuration();
     conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL,
         JobEndServlet.baseUrl + "jobend?jobid=$jobId&status=$jobStatus");
     JobImpl job = (JobImpl)app.submit(conf);
-    // Even though auto-complete is true, because app is not shut-down yet, 
user
-    // will only see RUNNING state.
     app.waitForInternalState(job, JobStateInternal.SUCCEEDED);
-    app.waitForState(job, JobState.RUNNING);
-    // Now shutdown. User should see SUCCEEDED state.
+    // Unregistration succeeds: successfullyUnregistered is set
     app.shutDownJob();
-    app.waitForState(job, JobState.SUCCEEDED);
     Assert.assertEquals(true, app.isLastAMRetry());
     Assert.assertEquals(1, JobEndServlet.calledTimes);
     Assert.assertEquals("jobid=" + job.getID() + "&status=SUCCEEDED",
@@ -214,24 +215,25 @@ public class TestJobEndNotifier extends 
   }
 
   @Test
-  public void testNotificationOnNonLastRetryShutdown() throws Exception {
+  public void testAbsentNotificationOnNotLastRetryUnregistrationFailure()
+      throws Exception {
     HttpServer server = startHttpServer();
-    MRApp app = spy(new MRApp(2, 2, false, this.getClass().getName(), true));
+    MRApp app = spy(new MRAppWithCustomContainerAllocator(2, 2, false,
+        this.getClass().getName(), true, 1, false));
     doNothing().when(app).sysexit();
-    // Make use of safeToReportflag so that we can look at final job-state as
-    // seen by real users.
-    app.safeToReportTerminationToUser.set(false);
     Configuration conf = new Configuration();
     conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL,
         JobEndServlet.baseUrl + "jobend?jobid=$jobId&status=$jobStatus");
-    JobImpl job = (JobImpl)app.submit(new Configuration());
+    JobImpl job = (JobImpl)app.submit(conf);
     app.waitForState(job, JobState.RUNNING);
     app.getContext().getEventHandler()
       .handle(new JobEvent(app.getJobId(), JobEventType.JOB_AM_REBOOT));
     app.waitForInternalState(job, JobStateInternal.REBOOT);
+    // Now shutdown.
+    // Unregistration fails: isLastAMRetry is recalculated, this is not
+    app.shutDownJob();
     // Not the last AM attempt. So user should that the job is still running.
     app.waitForState(job, JobState.RUNNING);
-    app.shutDownJob();
     Assert.assertEquals(false, app.isLastAMRetry());
     Assert.assertEquals(0, JobEndServlet.calledTimes);
     Assert.assertEquals(null, JobEndServlet.requestUri);
@@ -239,6 +241,33 @@ public class TestJobEndNotifier extends 
     server.stop();
   }
 
+  @Test
+  public void testNotificationOnLastRetryUnregistrationFailure()
+      throws Exception {
+    HttpServer server = startHttpServer();
+    MRApp app = spy(new MRAppWithCustomContainerAllocator(2, 2, false,
+        this.getClass().getName(), true, 2, false));
+    doNothing().when(app).sysexit();
+    Configuration conf = new Configuration();
+    conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL,
+        JobEndServlet.baseUrl + "jobend?jobid=$jobId&status=$jobStatus");
+    JobImpl job = (JobImpl)app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    app.getContext().getEventHandler()
+      .handle(new JobEvent(app.getJobId(), JobEventType.JOB_AM_REBOOT));
+    app.waitForInternalState(job, JobStateInternal.REBOOT);
+    // Now shutdown. User should see FAILED state.
+    // Unregistration fails: isLastAMRetry is recalculated, this is
+    app.shutDownJob();
+    Assert.assertEquals(true, app.isLastAMRetry());
+    Assert.assertEquals(1, JobEndServlet.calledTimes);
+    Assert.assertEquals("jobid=" + job.getID() + "&status=FAILED",
+        JobEndServlet.requestUri.getQuery());
+    Assert.assertEquals(JobState.FAILED.toString(),
+      JobEndServlet.foundJobState);
+    server.stop();
+  }
+
   private static HttpServer startHttpServer() throws Exception {
     new File(System.getProperty(
         "build.webapps", "build/webapps") + "/test").mkdirs();
@@ -280,4 +309,83 @@ public class TestJobEndNotifier extends 
     }
   }
 
+  private class MRAppWithCustomContainerAllocator extends MRApp {
+
+    private boolean crushUnregistration;
+
+    public MRAppWithCustomContainerAllocator(int maps, int reduces,
+        boolean autoComplete, String testName, boolean cleanOnStart,
+        int startCount, boolean crushUnregistration) {
+      super(maps, reduces, autoComplete, testName, cleanOnStart, startCount,
+          false);
+      this.crushUnregistration = crushUnregistration;
+    }
+
+    @Override
+    protected ContainerAllocator createContainerAllocator(
+        ClientService clientService, AppContext context) {
+      context = spy(context);
+      when(context.getEventHandler()).thenReturn(null);
+      when(context.getApplicationID()).thenReturn(null);
+      return new CustomContainerAllocator(this, context);
+    }
+
+    private class CustomContainerAllocator
+        extends RMCommunicator
+        implements ContainerAllocator, RMHeartbeatHandler {
+      private MRAppWithCustomContainerAllocator app;
+      private MRAppContainerAllocator allocator =
+          new MRAppContainerAllocator();
+
+      public CustomContainerAllocator(
+          MRAppWithCustomContainerAllocator app, AppContext context) {
+        super(null, context);
+        this.app = app;
+      }
+
+      @Override
+      public void serviceInit(Configuration conf) {
+      }
+
+      @Override
+      public void serviceStart() {
+      }
+
+      @Override
+      public void serviceStop() {
+        unregister();
+      }
+
+      @Override
+      protected void doUnregistration()
+          throws YarnException, IOException, InterruptedException {
+        if (crushUnregistration) {
+          app.successfullyUnregistered.set(true);
+        } else {
+          throw new YarnException("test exception");
+        }
+      }
+
+      @Override
+      public void handle(ContainerAllocatorEvent event) {
+        allocator.handle(event);
+      }
+
+      @Override
+      public long getLastHeartbeatTime() {
+        return allocator.getLastHeartbeatTime();
+      }
+
+      @Override
+      public void runOnNextHeartbeat(Runnable callback) {
+        allocator.runOnNextHeartbeat(callback);
+      }
+
+      @Override
+      protected void heartbeat() throws Exception {
+      }
+    }
+
+  }
+
 }

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1529682&r1=1529681&r2=1529682&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
 Sun Oct  6 20:53:28 2013
@@ -29,7 +29,6 @@ import java.util.Iterator;
 import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@@ -44,7 +43,6 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
-import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
@@ -55,15 +53,12 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.util.Clock;
 import org.junit.Test;
 
 /**
@@ -384,12 +379,13 @@ public class TestMRApp {
     // AM is not unregistered
     Assert.assertEquals(JobState.RUNNING, job.getState());
     // imitate that AM is unregistered
-    app.safeToReportTerminationToUser.set(true);
+    app.successfullyUnregistered.set(true);
     app.waitForState(job, JobState.SUCCEEDED);
   }
 
   @Test
-  public void testJobRebootNotLastRetry() throws Exception {
+  public void testJobRebootNotLastRetryOnUnregistrationFailure()
+      throws Exception {
     MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true);
     Job job = app.submit(new Configuration());
     app.waitForState(job, JobState.RUNNING);
@@ -408,10 +404,12 @@ public class TestMRApp {
   }
 
   @Test
-  public void testJobRebootOnLastRetry() throws Exception {
+  public void testJobRebootOnLastRetryOnUnregistrationFailure()
+      throws Exception {
     // make startCount as 2 since this is last retry which equals to
     // DEFAULT_MAX_AM_RETRY
-    MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true, 2);
+    // The last param mocks the unregistration failure
+    MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true, 2, 
false);
 
     Configuration conf = new Configuration();
     Job job = app.submit(conf);
@@ -425,8 +423,10 @@ public class TestMRApp {
     app.getContext().getEventHandler().handle(new JobEvent(job.getID(),
       JobEventType.JOB_AM_REBOOT));
 
-    // return exteranl state as ERROR if this is the last retry
-    app.waitForState(job, JobState.ERROR);
+    app.waitForInternalState((JobImpl) job, JobStateInternal.REBOOT);
+    // return exteranl state as RUNNING if this is the last retry while
+    // unregistration fails
+    app.waitForState(job, JobState.RUNNING);
   }
 
   private final class MRAppWithSpiedJob extends MRApp {

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java?rev=1529682&r1=1529681&r2=1529682&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
 Sun Oct  6 20:53:28 2013
@@ -869,7 +869,7 @@ public class TestRuntimeEstimators {
     }
 
     @Override
-    public boolean safeToReportTerminationToUser() {
+    public boolean hasSuccessfullyUnregistered() {
       // bogus - Not Required
       return true;
     }

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1529682&r1=1529681&r2=1529682&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
 Sun Oct  6 20:53:28 2013
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.a
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -36,18 +37,17 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster.RunningAppContext;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
-import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -57,7 +57,7 @@ import org.apache.hadoop.service.Service
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -75,7 +75,44 @@ import org.junit.Test;
    private Path stagingJobPath = new Path(stagingJobDir);
    private final static RecordFactory recordFactory = RecordFactoryProvider.
        getRecordFactory(null);
-   
+
+   @Test
+   public void testDeletionofStagingOnUnregistrationFailure()
+       throws IOException {
+     testDeletionofStagingOnUnregistrationFailure(2, false);
+     testDeletionofStagingOnUnregistrationFailure(1, true);
+   }
+
+   @SuppressWarnings("resource")
+   private void testDeletionofStagingOnUnregistrationFailure(
+       int maxAttempts, boolean shouldHaveDeleted) throws IOException {
+     conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+     fs = mock(FileSystem.class);
+     when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+     //Staging Dir exists
+     String user = UserGroupInformation.getCurrentUser().getShortUserName();
+     Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+     when(fs.exists(stagingDir)).thenReturn(true);
+     ApplicationId appId = ApplicationId.newInstance(0, 1);
+     ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 
1);
+     JobId jobid = recordFactory.newRecordInstance(JobId.class);
+     jobid.setAppId(appId);
+     TestMRApp appMaster = new TestMRApp(attemptId, null,
+         JobStateInternal.RUNNING, maxAttempts);
+     appMaster.crushUnregistration = true;
+     appMaster.init(conf);
+     appMaster.start();
+     appMaster.shutDownJob();
+     ((RunningAppContext) appMaster.getContext()).computeIsLastAMRetry();
+     if (shouldHaveDeleted) {
+       Assert.assertEquals(new Boolean(true), appMaster.isLastAMRetry());
+       verify(fs).delete(stagingJobPath, true);
+     } else {
+       Assert.assertEquals(new Boolean(false), appMaster.isLastAMRetry());
+       verify(fs, never()).delete(stagingJobPath, true);
+     }
+   }
+
    @Test
    public void testDeletionofStaging() throws IOException {
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
@@ -204,6 +241,7 @@ import org.junit.Test;
      ContainerAllocator allocator;
      boolean testIsLastAMRetry = false;
      JobStateInternal jobStateInternal;
+     boolean crushUnregistration = false;
 
      public TestMRApp(ApplicationAttemptId applicationAttemptId, 
          ContainerAllocator allocator, int maxAppAttempts) {
@@ -211,6 +249,7 @@ import org.junit.Test;
            applicationAttemptId, 1), "testhost", 2222, 3333,
            System.currentTimeMillis(), maxAppAttempts);
        this.allocator = allocator;
+       this.successfullyUnregistered.set(true);
      }
 
      public TestMRApp(ApplicationAttemptId applicationAttemptId,
@@ -229,7 +268,11 @@ import org.junit.Test;
      protected ContainerAllocator createContainerAllocator(
          final ClientService clientService, final AppContext context) {
        if(allocator == null) {
-         return super.createContainerAllocator(clientService, context);
+         if (crushUnregistration) {
+           return new CustomContainerAllocator(context);
+         } else {
+           return super.createContainerAllocator(clientService, context);
+         }
        }
        return allocator;
      }
@@ -280,6 +323,41 @@ import org.junit.Test;
      public boolean getTestIsLastAMRetry(){
        return testIsLastAMRetry;
      }
+
+    private class CustomContainerAllocator extends RMCommunicator
+        implements ContainerAllocator {
+
+      public CustomContainerAllocator(AppContext context) {
+        super(null, context);
+      }
+
+      @Override
+      public void serviceInit(Configuration conf) {
+      }
+
+      @Override
+      public void serviceStart() {
+      }
+
+      @Override
+      public void serviceStop() {
+        unregister();
+      }
+
+      @Override
+      protected void doUnregistration()
+          throws YarnException, IOException, InterruptedException {
+        throw new YarnException("test exception");
+      }
+
+      @Override
+      protected void heartbeat() throws Exception {
+      }
+
+      @Override
+      public void handle(ContainerAllocatorEvent event) {
+      }
+    }
    }
 
   private final class MRAppTestCleanup extends MRApp {

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1529682&r1=1529681&r2=1529682&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
 Sun Oct  6 20:53:28 2013
@@ -275,7 +275,7 @@ public class TestJobImpl {
 
     AppContext mockContext = mock(AppContext.class);
     when(mockContext.isLastAMRetry()).thenReturn(true);
-    when(mockContext.safeToReportTerminationToUser()).thenReturn(false);
+    when(mockContext.hasSuccessfullyUnregistered()).thenReturn(false);
     JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, mockContext);
     completeJobTasks(job);
     assertJobState(job, JobStateInternal.COMMITTING);
@@ -285,7 +285,7 @@ public class TestJobImpl {
     assertJobState(job, JobStateInternal.REBOOT);
     // return the external state as ERROR since this is last retry.
     Assert.assertEquals(JobState.RUNNING, job.getState());
-    when(mockContext.safeToReportTerminationToUser()).thenReturn(true);
+    when(mockContext.hasSuccessfullyUnregistered()).thenReturn(true);
     Assert.assertEquals(JobState.ERROR, job.getState());
 
     dispatcher.stop();
@@ -594,7 +594,7 @@ public class TestJobImpl {
         new JobDiagnosticsUpdateEvent(jobId, diagMsg);
     MRAppMetrics mrAppMetrics = MRAppMetrics.create();
     AppContext mockContext = mock(AppContext.class);
-    when(mockContext.safeToReportTerminationToUser()).thenReturn(true);
+    when(mockContext.hasSuccessfullyUnregistered()).thenReturn(true);
     JobImpl job = new JobImpl(jobId, Records
         .newRecord(ApplicationAttemptId.class), new Configuration(),
         mock(EventHandler.class),
@@ -705,7 +705,7 @@ public class TestJobImpl {
     commitHandler.start();
 
     AppContext mockContext = mock(AppContext.class);
-    when(mockContext.safeToReportTerminationToUser()).thenReturn(false);
+    when(mockContext.hasSuccessfullyUnregistered()).thenReturn(false);
     JobImpl job = createStubbedJob(conf, dispatcher, 2, mockContext);
     JobId jobId = job.getID();
     job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
@@ -722,7 +722,7 @@ public class TestJobImpl {
     job.handle(new JobEvent(jobId, 
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE));
     assertJobState(job, JobStateInternal.FAILED);
     Assert.assertEquals(JobState.RUNNING, job.getState());
-    when(mockContext.safeToReportTerminationToUser()).thenReturn(true);
+    when(mockContext.hasSuccessfullyUnregistered()).thenReturn(true);
     Assert.assertEquals(JobState.FAILED, job.getState());
 
     dispatcher.stop();
@@ -762,7 +762,7 @@ public class TestJobImpl {
     JobId jobId = TypeConverter.toYarn(jobID);
     if (appContext == null) {
       appContext = mock(AppContext.class);
-      when(appContext.safeToReportTerminationToUser()).thenReturn(true);
+      when(appContext.hasSuccessfullyUnregistered()).thenReturn(true);
     }
     StubbedJob job = new StubbedJob(jobId,
         ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0),

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java?rev=1529682&r1=1529681&r2=1529682&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
 Sun Oct  6 20:53:28 2013
@@ -89,6 +89,10 @@ public class TestLocalContainerAllocator
     }
 
     @Override
+    protected void unregister() {
+    }
+
+    @Override
     protected void startAllocatorThread() {
       allocatorThread = new Thread();
     }

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1529682&r1=1529681&r2=1529682&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
 Sun Oct  6 20:53:28 2013
@@ -389,7 +389,7 @@ public class JobHistory extends Abstract
   }
 
   @Override
-  public boolean safeToReportTerminationToUser() {
+  public boolean hasSuccessfullyUnregistered() {
     // bogus - Not Required
     return true;
   }


Reply via email to