Modified: 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java?rev=1398523&r1=1398522&r2=1398523&view=diff
==============================================================================
--- 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java
 (original)
+++ 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java
 Mon Oct 15 21:09:59 2012
@@ -39,9 +39,9 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app2.AppContext;
 import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
-import 
org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptDiagnosticsUpdateEvent;
-import 
org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventKillRequest;
-import 
org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventTerminated;
+import 
org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventContainerTerminated;
+import 
org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventContainerTerminating;
+import 
org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventNodeFailed;
 import 
org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventContainerCompleted;
 import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorLaunchRequestEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorStopRequestEvent;
@@ -61,10 +61,9 @@ import org.apache.hadoop.yarn.state.Stat
 public class AMContainerImpl implements AMContainer {
 
   private static final Log LOG = LogFactory.getLog(AMContainerImpl.class);
-
+  
   private final ReadLock readLock;
   private final WriteLock writeLock;
-  // TODO Use ContainerId or a custom JvmId.
   private final ContainerId containerId;
   // Container to be used for getters on capability, locality etc.
   private final Container container;
@@ -88,7 +87,7 @@ public class AMContainerImpl implements 
   
   private TaskAttemptId pendingAttempt;
   private TaskAttemptId runningAttempt;
-  private TaskAttemptId interruptedEvent;
+  private List<TaskAttemptId> failedAssignments;
   private TaskAttemptId pullAttempt;
   
   private boolean inError = false;
@@ -109,53 +108,59 @@ public class AMContainerImpl implements 
   private void initStateMachineFactory() {
     stateMachineFactory = 
     stateMachineFactory
-        .addTransition(AMContainerState.ALLOCATED, AMContainerState.LAUNCHING, 
AMContainerEventType.C_START_REQUEST, createLaunchRequestTransition())
+        .addTransition(AMContainerState.ALLOCATED, AMContainerState.LAUNCHING, 
AMContainerEventType.C_LAUNCH_REQUEST, createLaunchRequestTransition())
         .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, 
AMContainerEventType.C_ASSIGN_TA, 
createAssignTaskAttemptAtAllocatedTransition())
         .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, 
AMContainerEventType.C_COMPLETED, createCompletedAtAllocatedTransition())
         .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, 
AMContainerEventType.C_STOP_REQUEST, createStopRequestTransition())
         .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, 
AMContainerEventType.C_NODE_FAILED, createNodeFailedAtAllocatedTransition())
-        .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, 
EnumSet.of(AMContainerEventType.C_LAUNCHED, 
AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, 
AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_FAILED, 
AMContainerEventType.C_TIMED_OUT), createGenericErrorTransition())
+        .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, 
EnumSet.of(AMContainerEventType.C_LAUNCHED, 
AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, 
AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, 
AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT), 
createGenericErrorTransition())
         
         
-        .addTransition(AMContainerState.LAUNCHING, 
EnumSet.of(AMContainerState.LAUNCHING, AMContainerState.STOPPING), 
AMContainerEventType.C_ASSIGN_TA, createAssignTaskAttemptTransition())
+        .addTransition(AMContainerState.LAUNCHING, 
EnumSet.of(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED), 
AMContainerEventType.C_ASSIGN_TA, createAssignTaskAttemptTransition())
         .addTransition(AMContainerState.LAUNCHING, AMContainerState.IDLE, 
AMContainerEventType.C_LAUNCHED, createLaunchedTransition())
         .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, 
AMContainerEventType.C_LAUNCH_FAILED, createLaunchFailedTransition())
         .addTransition(AMContainerState.LAUNCHING, AMContainerState.LAUNCHING, 
AMContainerEventType.C_PULL_TA) // Is assuming the pullAttempt will be null.
         .addTransition(AMContainerState.LAUNCHING, AMContainerState.COMPLETED, 
AMContainerEventType.C_COMPLETED, createCompletedAtLaunchingTransition())
-        .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, 
AMContainerEventType.C_STOP_REQUEST, createStopRequestAtLaunchingTransition())
+        .addTransition(AMContainerState.LAUNCHING, 
AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, 
createStopRequestAtLaunchingTransition())
         .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, 
AMContainerEventType.C_NODE_FAILED, createNodeFailedAtLaunchingTransition())
-        .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, 
EnumSet.of(AMContainerEventType.C_START_REQUEST, 
AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_FAILED, 
AMContainerEventType.C_TIMED_OUT), createGenericErrorAtLaunchingTransition())
-        
-        
-        .addTransition(AMContainerState.IDLE, 
EnumSet.of(AMContainerState.IDLE, AMContainerState.STOPPING), 
AMContainerEventType.C_ASSIGN_TA, createAssignTaskAttemptAtIdleTransition())
+        .addTransition(AMContainerState.LAUNCHING, 
AMContainerState.STOP_REQUESTED, 
EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, 
AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, 
AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT), 
createGenericErrorAtLaunchingTransition())
+
+        .addTransition(AMContainerState.IDLE, 
EnumSet.of(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED), 
AMContainerEventType.C_ASSIGN_TA, createAssignTaskAttemptAtIdleTransition())
         .addTransition(AMContainerState.IDLE, 
EnumSet.of(AMContainerState.RUNNING, AMContainerState.IDLE), 
AMContainerEventType.C_PULL_TA, createPullTAAtIdleTransition())
         .addTransition(AMContainerState.IDLE, AMContainerState.COMPLETED, 
AMContainerEventType.C_COMPLETED, createCompletedAtIdleTransition())
-        .addTransition(AMContainerState.IDLE, AMContainerState.STOPPING, 
AMContainerEventType.C_STOP_REQUEST, createStopRequestAtIdleTransition())
-        .addTransition(AMContainerState.IDLE, AMContainerState.STOPPING, 
AMContainerEventType.C_TIMED_OUT, createTimedOutAtIdleTransition())
+        .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, 
AMContainerEventType.C_STOP_REQUEST, createStopRequestAtIdleTransition())
+        .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, 
AMContainerEventType.C_TIMED_OUT, createTimedOutAtIdleTransition())
         .addTransition(AMContainerState.IDLE, AMContainerState.STOPPING, 
AMContainerEventType.C_NODE_FAILED, createNodeFailedAtIdleTransition())
-        .addTransition(AMContainerState.IDLE, AMContainerState.STOPPING, 
EnumSet.of(AMContainerEventType.C_START_REQUEST, 
AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, 
AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_FAILED), 
createGenericErrorAtIdleTransition())
+        .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, 
EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, 
AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, 
AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, 
AMContainerEventType.C_NM_STOP_FAILED), createGenericErrorAtIdleTransition())
         
-        .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, 
AMContainerEventType.C_ASSIGN_TA, createAssignTaskAttemptAtRunningTransition())
+        .addTransition(AMContainerState.RUNNING, 
AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA, 
createAssignTaskAttemptAtRunningTransition())
         .addTransition(AMContainerState.RUNNING, AMContainerState.RUNNING, 
AMContainerEventType.C_PULL_TA)
         .addTransition(AMContainerState.RUNNING, AMContainerState.IDLE, 
AMContainerEventType.C_TA_SUCCEEDED, createTASucceededAtRunningTransition())
         .addTransition(AMContainerState.RUNNING, AMContainerState.COMPLETED, 
AMContainerEventType.C_COMPLETED, createCompletedAtRunningTransition())
-        .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, 
AMContainerEventType.C_STOP_REQUEST, createStopRequestAtRunningTransition())
-        .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, 
AMContainerEventType.C_TIMED_OUT, createTimedOutAtRunningTransition())
+        .addTransition(AMContainerState.RUNNING, 
AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, 
createStopRequestAtRunningTransition())
+        .addTransition(AMContainerState.RUNNING, 
AMContainerState.STOP_REQUESTED, AMContainerEventType.C_TIMED_OUT, 
createTimedOutAtRunningTransition())
         .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, 
AMContainerEventType.C_NODE_FAILED, createNodeFailedAtRunningTransition())
-        .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, 
EnumSet.of(AMContainerEventType.C_START_REQUEST, 
AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, 
AMContainerEventType.C_STOP_FAILED), createGenericErrorAtRunningTransition())
+        .addTransition(AMContainerState.RUNNING, 
AMContainerState.STOP_REQUESTED, 
EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, 
AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, 
AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED), 
createGenericErrorAtRunningTransition())
         
+        .addTransition(AMContainerState.STOP_REQUESTED, 
AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA, 
createAssignTAAtStoppingTransition())
+        .addTransition(AMContainerState.STOP_REQUESTED, 
AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, 
createCompletedAtStoppingTransition())
+        .addTransition(AMContainerState.STOP_REQUESTED, 
AMContainerState.STOPPING, AMContainerEventType.C_NM_STOP_SENT)
+        .addTransition(AMContainerState.STOP_REQUESTED, 
AMContainerState.STOPPING, AMContainerEventType.C_NM_STOP_FAILED, 
createStopFailedAtNMStopRequested()) // TODO XXX: Rename these.
+        .addTransition(AMContainerState.STOP_REQUESTED, 
AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, 
createNodeFailedAtNMStopRequestedTransition())
+        .addTransition(AMContainerState.STOP_REQUESTED, 
AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCHED, 
AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, 
AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, 
AMContainerEventType.C_TIMED_OUT))
+        .addTransition(AMContainerState.STOP_REQUESTED, 
AMContainerState.STOP_REQUESTED, AMContainerEventType.C_LAUNCH_REQUEST, 
createGenericErrorAtStoppingTransition())
         
         .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, 
AMContainerEventType.C_ASSIGN_TA, createAssignTAAtStoppingTransition())
         .addTransition(AMContainerState.STOPPING, AMContainerState.COMPLETED, 
AMContainerEventType.C_COMPLETED, createCompletedAtStoppingTransition())
-        .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, 
AMContainerEventType.C_NODE_FAILED, createNodeFailedBaseTransition())
-        .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, 
EnumSet.of(AMContainerEventType.C_LAUNCHED, 
AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, 
AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, 
AMContainerEventType.C_STOP_FAILED, AMContainerEventType.C_TIMED_OUT))
-        .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, 
AMContainerEventType.C_START_REQUEST, createGenericErrorAtStoppingTransition())
+        .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, 
AMContainerEventType.C_NODE_FAILED, createNodeFailedAtStoppingTransition())
+        .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, 
EnumSet.of(AMContainerEventType.C_LAUNCHED, 
AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, 
AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, 
AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, 
AMContainerEventType.C_TIMED_OUT))
+        .addTransition(AMContainerState.STOP_REQUESTED, 
AMContainerState.STOP_REQUESTED, AMContainerEventType.C_LAUNCH_REQUEST, 
createGenericErrorAtStoppingTransition())
         
         .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, 
AMContainerEventType.C_ASSIGN_TA, createAssignTAAtCompletedTransition())
-        .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, 
AMContainerEventType.C_NODE_FAILED, createNodeFailedBaseTransition())
-        .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, 
EnumSet.of(AMContainerEventType.C_START_REQUEST, 
AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, 
AMContainerEventType.C_STOP_FAILED), createGenericErrorAtStoppingTransition())
-        .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, 
EnumSet.of(AMContainerEventType.C_STOP_REQUEST, 
AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_COMPLETED, 
AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_TA_SUCCEEDED, 
AMContainerEventType.C_COMPLETED, AMContainerEventType.C_STOP_REQUEST, 
AMContainerEventType.C_TIMED_OUT))
- 
+        .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, 
AMContainerEventType.C_NODE_FAILED, createNodeFailedAtCompletedTransition())
+        .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, 
EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, 
AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, 
AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, 
AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, 
AMContainerEventType.C_TIMED_OUT))
+        .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, 
EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST), 
createGenericErrorAtStoppingTransition())
+
         .installTopology();
   }
 
@@ -335,8 +340,8 @@ public class AMContainerImpl implements 
       AMContainerAssignTAEvent event = (AMContainerAssignTAEvent) cEvent;
       container.inError = true;
       container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
-          "AMScheduler Error: TaskAttempt should not be" +
-          " allocated before a launch request.");
+          "AMScheduler Error: TaskAttempt allocated to unlaunched container: "
+              + container.getContainerId());
       container.sendCompletedToScheduler();
       container.deAllocate();
       LOG.warn("Unexpected TA Assignment: TAId: " + event.getTaskAttemptId()
@@ -387,6 +392,10 @@ public class AMContainerImpl implements 
     }
   }
   
+  protected void registerFailedTAAssignment(TaskAttemptId taId) {
+    failedAssignments.add(taId);
+  }
+  
   protected void deAllocate() {
     sendEvent(new RMCommunicatorContainerDeAllocateRequestEvent(containerId));
   }
@@ -396,15 +405,17 @@ public class AMContainerImpl implements 
   }
 
   protected void sendTerminatedToTaskAttempt(TaskAttemptId taId, String 
message) {
-    if (message != null) {
-      sendEvent(new TaskAttemptDiagnosticsUpdateEvent(taId, message));
-    }
-    sendEvent(new TaskAttemptEventTerminated(taId));
+    sendEvent(new TaskAttemptEventContainerTerminated(taId, message));
   }
 
-  protected void sendKillRequestToTaskAttempt(TaskAttemptId taId) {
-    sendEvent(new TaskAttemptEventKillRequest(taId,
-        "Node running the contianer failed"));
+  protected void sendTerminatingToTA(TaskAttemptId taId, String message) {
+    sendEvent(new TaskAttemptEventContainerTerminating(taId, message));
+  }
+  
+  protected void sendNodeFailureToTA(AMContainerEvent event,
+      TaskAttemptId taId, String message) {
+    sendEvent(new TaskAttemptEventNodeFailed(taId, message));
+    // TODO XXX: Diag message from the node. Otherwise include the nodeId
   }
 
   protected void sendStopRequestToNM() {
@@ -439,11 +450,14 @@ public class AMContainerImpl implements 
         container.inError = true;
         String errorMessage = "AMScheduler Error: Multiple simultaneous " +
                        "taskAttempt allocations to: " + 
container.getContainerId();
-        container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
-            errorMessage);
-        container.deAllocate();
+        container.sendTerminatingToTA(event.getTaskAttemptId(), errorMessage);
+        container.registerFailedTAAssignment(event.getTaskAttemptId());
+        // TODO XXX: Verify that it's ok to send in a NM_STOP_REQUEST. The
+        // NMCommunicator should be able to handle this. The STOP_REQUEST would
+        // only go out after the START_REQUEST.
         LOG.warn(errorMessage);
-        return AMContainerState.STOPPING;
+        container.sendStopRequestToNM();
+        return AMContainerState.STOP_REQUESTED;
       }
       container.pendingAttempt = event.getTaskAttemptId();
       container.remoteTaskMap.put(event.getTaskAttemptId(),
@@ -490,7 +504,7 @@ public class AMContainerImpl implements 
         container.pendingAttempt = null;
         if (container.lastTaskFinishTime != 0) {
           long idleTimeDiff = System.currentTimeMillis() - 
container.lastTaskFinishTime;
-          LOG.info("Computing idle time for container: " + 
container.getContainerId() + ", lastFinishTime: " + 
container.lastTaskFinishTime + ", Incremented by: " + idleTimeDiff);
+          LOG.info("XXX: Computing idle time for container: " + 
container.getContainerId() + ", lastFinishTime: " + 
container.lastTaskFinishTime + ", Incremented by: " + idleTimeDiff);
           container.idleTimeBetweenTasks += System.currentTimeMillis() - 
container.lastTaskFinishTime;
         }
         LOG.info("XXX: Assigned task + [" + container.runningAttempt + "] to 
container: [" + container.getContainerId() + "]");
@@ -512,8 +526,8 @@ public class AMContainerImpl implements 
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
       if (container.pendingAttempt != null) {
         AMContainerEventLaunchFailed event = (AMContainerEventLaunchFailed) 
cEvent;
-        container.sendEvent(new TaskAttemptDiagnosticsUpdateEvent(
-            container.pendingAttempt, event.getMessage()));
+        container.sendTerminatingToTA(container.pendingAttempt,
+            event.getMessage());
       }
       container.deAllocate();
     }
@@ -531,7 +545,8 @@ public class AMContainerImpl implements 
       if (container.pendingAttempt != null) {
         String errorMessage = "Container" + container.getContainerId()
             + " failed. Received COMPLETED event while trying to launch";
-        
container.sendTerminatedToTaskAttempt(container.pendingAttempt,errorMessage);
+        container.sendTerminatedToTaskAttempt(container.pendingAttempt,
+            errorMessage);
         LOG.warn(errorMessage);    
         // TODO XXX Maybe nullify pendingAttempt.
       }
@@ -548,11 +563,14 @@ public class AMContainerImpl implements 
       SingleArcTransition<AMContainerImpl, AMContainerEvent> {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
+      if (container.pendingAttempt != null) {
+        container.sendTerminatingToTA(container.pendingAttempt,
+            " Container" + container.getContainerId() + " received a 
STOP_REQUEST");
+      }
       container.sendStopRequestToNM();
-      container.deAllocate();
     }
   }
-  
+
   protected SingleArcTransition<AMContainerImpl, AMContainerEvent>
       createNodeFailedAtLaunchingTransition() {
     return new NodeFailedAtLaunching();
@@ -563,7 +581,10 @@ public class AMContainerImpl implements 
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
       if (container.pendingAttempt != null) {
-        container.sendKillRequestToTaskAttempt(container.pendingAttempt);
+        container.sendNodeFailureToTA(cEvent, container.pendingAttempt, null);
+        // TODO XXX: Maybe include a diagnostic message along with the incoming
+        // Node failure event.
+        container.sendTerminatingToTA(container.pendingAttempt, "Node 
failure");
       }
       container.sendStopRequestToNM();
       container.deAllocate();
@@ -575,7 +596,7 @@ public class AMContainerImpl implements 
     return new AssignTaskAttemptAtIdle();
   }
 
-  // TODO Make this the base for all assignRequests. Some more error checking 
in
+  // TODO XXX Make this the base for all assignRequests. Some more error 
checking in
   // that case.
   protected static class AssignTaskAttemptAtIdle
       implements
@@ -588,17 +609,16 @@ public class AMContainerImpl implements 
         container.inError = true;
         String errorMessage = "AMScheduler Error: Multiple simultaneous "
             + "taskAttempt allocations to: " + container.getContainerId();
-        container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
-            errorMessage);
+        container.sendTerminatingToTA(event.getTaskAttemptId(), errorMessage);
+        container.registerFailedTAAssignment(event.getTaskAttemptId());
         LOG.warn(errorMessage);
         container.sendStopRequestToNM();
-        container.deAllocate();
         container.containerHeartbeatHandler.unregister(container.containerId);
         
-        return AMContainerState.STOPPING;
+        return AMContainerState.STOP_REQUESTED;
       }
       container.pendingAttempt = event.getTaskAttemptId();
-      // TODO LATER. Cleanup the remoteTaskMap.
+      // TODO XXX LATER. Cleanup the remoteTaskMap.
       container.remoteTaskMap.put(event.getTaskAttemptId(),
           event.getRemoteTask());
       return AMContainerState.IDLE;
@@ -617,10 +637,12 @@ public class AMContainerImpl implements 
       LOG.info("Cotnainer with id: " + container.getContainerId()
           + " Completed." + " Previous state was: " + container.getState());
       if (container.pendingAttempt != null) {
-        container.sendTerminatedToTaskAttempt(container.pendingAttempt, null);
+        container.sendTerminatedToTaskAttempt(container.pendingAttempt,
+            "Container " + container.getContainerId() + " FINISHED.");
       }
       container.sendCompletedToScheduler();
       container.containerHeartbeatHandler.unregister(container.containerId);
+      container.unregisterJvmFromListener(container.jvmId);
     }
   }
   
@@ -629,16 +651,13 @@ public class AMContainerImpl implements 
     return new StopRequestAtIdle();
   }
   
-  protected static class StopRequestAtIdle implements
-      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+  protected static class StopRequestAtIdle extends StopRequestAtLaunching {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
+      super.transition(container, cEvent);
       LOG.info("XXX: IdleTimeBetweenTasks: " + container.idleTimeBetweenTasks);
-      container.sendStopRequestToNM();
-      container.deAllocate();
       container.containerHeartbeatHandler.unregister(container.containerId);
       container.unregisterJvmFromListener(container.jvmId);
-      // TODO XXXXXXXXX: Unregister from TAL so that the Container kills 
itself (via a kill task assignment)
     }
   }
 
@@ -648,6 +667,7 @@ public class AMContainerImpl implements 
   }
 
   protected static class TimedOutAtIdle extends StopRequestAtIdle {
+    // TODO XXX: Override to change the diagnostic message that goes to the 
TaskAttempt. Functionality is the same.
   }
   
   protected SingleArcTransition<AMContainerImpl, AMContainerEvent>
@@ -675,15 +695,13 @@ public class AMContainerImpl implements 
       SingleArcTransition<AMContainerImpl, AMContainerEvent> {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
-      container.sendTerminatedToTaskAttempt(container.runningAttempt, null);
+      container.sendTerminatedToTaskAttempt(container.runningAttempt,
+          "Container " + container.getContainerId()
+              + " FINISHED while task was running");
       container.sendCompletedToScheduler();
       container.containerHeartbeatHandler.unregister(container.containerId);
       container.unregisterAttemptFromListener(container.runningAttempt);
       container.unregisterJvmFromListener(container.jvmId);
-      container.interruptedEvent = container.runningAttempt;
-      container.runningAttempt = null;
-      
-      
     }
   }
 
@@ -696,10 +714,9 @@ public class AMContainerImpl implements 
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
       super.transition(container, cEvent);
       container.unregisterAttemptFromListener(container.runningAttempt);
-//      container.unregisterJvmFromListener(container.jvmId);
+      container.sendTerminatingToTA(container.runningAttempt,
+          " Container" + container.getContainerId() + " received a 
STOP_REQUEST");
       // TODO XXX: All running transition. verify whether runningAttempt 
should be null.
-      container.interruptedEvent = container.runningAttempt;
-      container.runningAttempt = null;
     }
   }
 
@@ -709,6 +726,7 @@ public class AMContainerImpl implements 
   }
 
   protected static class TimedOutAtRunning extends StopRequestAtRunning {
+    // TODO XXX: Change the error message.
   }
 
   protected SingleArcTransition<AMContainerImpl, AMContainerEvent>
@@ -721,12 +739,10 @@ public class AMContainerImpl implements 
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
       super.transition(container, cEvent);
-      container.sendKillRequestToTaskAttempt(container.runningAttempt);
+      container.sendNodeFailureToTA(cEvent, container.runningAttempt, null);
+      container.sendTerminatingToTA(container.runningAttempt, "Node failure");
+
       container.unregisterAttemptFromListener(container.runningAttempt);
-      container.unregisterJvmFromListener(container.jvmId);
-      container.interruptedEvent = container.runningAttempt;
-      container.runningAttempt = null;
-      
     }
   }
  
@@ -744,9 +760,9 @@ public class AMContainerImpl implements 
       container.inError = true;
       String errorMessage = "AttemptId: " + event.getTaskAttemptId()
           + " cannot be allocated to container: " + container.getContainerId()
-          + " in STOPPING state";
-      container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
-          errorMessage);
+          + " in " + container.getState() + " state";
+      container.sendTerminatingToTA(event.getTaskAttemptId(), errorMessage);
+      container.registerFailedTAAssignment(event.getTaskAttemptId());
     }
   }
 
@@ -761,6 +777,7 @@ public class AMContainerImpl implements 
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
       container.inError = true;
+      // TODO XXX: Anything else required in the error transitions ?
     }
   }
 
@@ -791,22 +808,32 @@ public class AMContainerImpl implements 
       SingleArcTransition<AMContainerImpl, AMContainerEvent> {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
-      // XXX: Would some of these events not have gone out when entering the 
STOPPING state. Fix errorMessages
+      // TODO XXX: Set everything to null after sending these out.
       if (container.pendingAttempt != null) {
         container.sendTerminatedToTaskAttempt(container.pendingAttempt, null);
       }
       if (container.runningAttempt != null) {
         container.sendTerminatedToTaskAttempt(container.runningAttempt, null);
       }
-      if (container.interruptedEvent != null) {
-        container.sendTerminatedToTaskAttempt(container.interruptedEvent, 
null);
-      }
       container.sendCompletedToScheduler();
     }
   }
 
+  protected SingleArcTransition<AMContainerImpl, AMContainerEvent>
+      createStopFailedAtNMStopRequested() {
+    return new StopFailedAtNMStopRequested();
+  }
+
+  protected static class StopFailedAtNMStopRequested implements
+      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
+      container.deAllocate();
+    }
+  }
 
-  protected SingleArcTransition<AMContainerImpl, AMContainerEvent> 
createNodeFailedBaseTransition() {
+  protected SingleArcTransition<AMContainerImpl, AMContainerEvent> 
+      createNodeFailedBaseTransition() {
     return new NodeFailedBase();
   }
   
@@ -820,43 +847,96 @@ public class AMContainerImpl implements 
       // let multiple events go out and the TA should be able to handle them.
       // Kill_TA going out in this case.
       if (container.runningAttempt != null) {
-        container.killTaskAttempt(container.runningAttempt);
+        container.sendNodeFailureToTA(cEvent, container.runningAttempt, null);
+        container.sendTerminatingToTA(container.runningAttempt, "Node 
Failure");
       }
       if (container.pendingAttempt != null) {
-        container.killTaskAttempt(container.pendingAttempt);
+        container.sendNodeFailureToTA(cEvent, container.pendingAttempt, null);
       }
       for (TaskAttemptId attemptId : container.completedAttempts) {
         // TODO XXX: Make sure TaskAttempt knows how to handle kills to 
REDUCEs.
-//        if (attemptId.getTaskId().getTaskType() == TaskType.MAP) {
-          container.killTaskAttempt(attemptId);
-//        }s
+        container.sendNodeFailureToTA(cEvent, attemptId, null);
       }
-      
     }
   }
   
-  private void killTaskAttempt(TaskAttemptId attemptId) {
-    sendEvent(new TaskAttemptEventKillRequest(attemptId, "The node running the 
task attempt was marked as bad"));
+  protected SingleArcTransition<AMContainerImpl, AMContainerEvent> 
+      createNodeFailedAtStoppingTransition() {
+    return new NodeFailedAtSopping();
   }
   
+  protected static class NodeFailedAtSopping extends NodeFailedBase {
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
+      super.transition(container, cEvent);
+      if (container.runningAttempt != null) { 
+        container.sendTerminatingToTA(container.runningAttempt, "Node 
Failure");
+      }
+    }
+  }
+
+  protected SingleArcTransition<AMContainerImpl, AMContainerEvent>
+      createNodeFailedAtCompletedTransition() {
+    return new NodeFailedAtCompleted();
+  }
+
+  protected static class NodeFailedAtCompleted extends NodeFailedBase {
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
+      super.transition(container, cEvent);
+      if (container.runningAttempt != null) {
+        container.sendTerminatedToTaskAttempt(container.runningAttempt,
+            "Node Failure");
+      }
+    }
+  }
+
+  protected SingleArcTransition<AMContainerImpl, AMContainerEvent> 
createNodeFailedAtNMStopRequestedTransition() {
+    return new NodeFailedAtNMStopRequested();
+  }
+
+  protected static class NodeFailedAtNMStopRequested implements
+      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
+      if (container.runningAttempt != null) {
+        container.sendNodeFailureToTA(cEvent, container.runningAttempt,
+            null);
+        container.sendTerminatingToTA(container.runningAttempt, "Node 
Failure");
+      }
+      if (container.pendingAttempt != null) {
+        container.sendNodeFailureToTA(cEvent, container.pendingAttempt,
+            null);
+      }
+      for (TaskAttemptId attemptId : container.completedAttempts) {
+        // TODO XXX: Make sure TaskAttempt knows how to handle kills to 
REDUCEs.
+        container.sendNodeFailureToTA(cEvent, attemptId, null);
+      }
+      for (TaskAttemptId attemptId : container.failedAssignments) {
+        container.sendNodeFailureToTA(cEvent, attemptId, null);
+      }
+      container.deAllocate();
+    }
+  }
+
   protected SingleArcTransition<AMContainerImpl, AMContainerEvent>
       createNodeFailedAtIdleTransition() {
     return new NodeFailedAtIdle();
   }
-  
-  protected static class NodeFailedAtIdle implements 
SingleArcTransition<AMContainerImpl, AMContainerEvent> {
-    
+
+  protected static class NodeFailedAtIdle implements
+      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
       container.sendStopRequestToNM();
       container.deAllocate();
       if (container.pendingAttempt != null) {
-        container.sendKillRequestToTaskAttempt(container.pendingAttempt);
+        container.sendNodeFailureToTA(cEvent, container.pendingAttempt, null);
+        container.sendTerminatingToTA(container.pendingAttempt, "Node 
Failure");
       }
       for (TaskAttemptId taId : container.completedAttempts) {
-        container.sendKillRequestToTaskAttempt(taId);
+        container.sendNodeFailureToTA(cEvent, taId, null);
       }
       container.containerHeartbeatHandler.unregister(container.containerId);
+      container.unregisterJvmFromListener(container.jvmId);
     }
   }
 
@@ -873,16 +953,18 @@ public class AMContainerImpl implements 
       container.inError = true;
       String errorMessage = "AttemptId: " + event.getTaskAttemptId()
           + " cannot be allocated to container: " + container.getContainerId()
-          + " in RUNNING state";
-      container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(), 
errorMessage);
+          + " in RUNNING state. Already executing TaskAttempt: "
+          + container.runningAttempt;
+      container.sendTerminatingToTA(event.getTaskAttemptId(), errorMessage);
+      container.registerFailedTAAssignment(event.getTaskAttemptId());
+      
+      container.sendTerminatingToTA(container.runningAttempt, errorMessage);
+      
       container.sendStopRequestToNM();
-      container.deAllocate();
       container.unregisterAttemptFromListener(container.runningAttempt);
       container.unregisterJvmFromListener(container.jvmId);
       container.containerHeartbeatHandler.unregister(container.containerId);
-      container.interruptedEvent = container.runningAttempt;
-      container.runningAttempt = null;
-      // TODO XXX: Is the TAL unregister required ?
+
     }
   }
 
@@ -926,6 +1008,7 @@ public class AMContainerImpl implements 
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
       super.transition(container, cEvent);
       container.containerHeartbeatHandler.unregister(container.containerId);
+      container.unregisterJvmFromListener(container.jvmId);
     }
   }
   
@@ -939,12 +1022,11 @@ public class AMContainerImpl implements 
       super.transition(container, cEvent);
       container.unregisterAttemptFromListener(container.runningAttempt);
       container.unregisterJvmFromListener(container.jvmId);
-      container.interruptedEvent = container.runningAttempt;
-      container.runningAttempt = null;
     }
   }
 
   // TODO Create a generic ERROR state. Container tries informing relevant 
components in this case.
+  // TODO XXX: Rename all generic error transitions.
   
 
 }

Modified: 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerLaunchRequestEvent.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerLaunchRequestEvent.java?rev=1398523&r1=1398522&r2=1398523&view=diff
==============================================================================
--- 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerLaunchRequestEvent.java
 (original)
+++ 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerLaunchRequestEvent.java
 Mon Oct 15 21:09:59 2012
@@ -38,7 +38,7 @@ public class AMContainerLaunchRequestEve
   public AMContainerLaunchRequestEvent(ContainerId containerId, JobId jobId,
       TaskType taskType, Token<JobTokenIdentifier> jobToken,
       Credentials credentials, boolean shouldProfile, JobConf jobConf) {
-    super(containerId, AMContainerEventType.C_START_REQUEST);
+    super(containerId, AMContainerEventType.C_LAUNCH_REQUEST);
     this.jobId = jobId;
     this.taskTypeForContainer = taskType;
     this.jobToken = jobToken;

Modified: 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerState.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerState.java?rev=1398523&r1=1398522&r2=1398523&view=diff
==============================================================================
--- 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerState.java
 (original)
+++ 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerState.java
 Mon Oct 15 21:09:59 2012
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
 package org.apache.hadoop.mapreduce.v2.app2.rm.container;
 
 public enum AMContainerState {
@@ -5,6 +22,9 @@ public enum AMContainerState {
   LAUNCHING,
   IDLE,
   RUNNING,
+  // indicates a NM stop request has been attempted. This request could fail, 
in
+  // which case an RM stop request needs to be sent.
+  STOP_REQUESTED, 
   STOPPING,
   COMPLETED,
 }

Modified: 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventTaskAttemptEnded.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventTaskAttemptEnded.java?rev=1398523&r1=1398522&r2=1398523&view=diff
==============================================================================
--- 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventTaskAttemptEnded.java
 (original)
+++ 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventTaskAttemptEnded.java
 Mon Oct 15 21:09:59 2012
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
 package org.apache.hadoop.mapreduce.v2.app2.rm.node;
 
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -11,7 +28,8 @@ public class AMNodeEventTaskAttemptEnded
   private final ContainerId containerId;
   private final TaskAttemptId taskAttemptId;
   
-  public AMNodeEventTaskAttemptEnded(NodeId nodeId, ContainerId containerId, 
TaskAttemptId taskAttemptId, boolean failed) {
+  public AMNodeEventTaskAttemptEnded(NodeId nodeId, ContainerId containerId,
+      TaskAttemptId taskAttemptId, boolean failed) {
     super(nodeId, AMNodeEventType.N_TA_ENDED);
     this.failed = failed;
     this.containerId = containerId;

Modified: 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventType.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventType.java?rev=1398523&r1=1398522&r2=1398523&view=diff
==============================================================================
--- 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventType.java
 (original)
+++ 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventType.java
 Mon Oct 15 21:09:59 2012
@@ -29,7 +29,7 @@ public enum AMNodeEventType {
   //Producer: RMCommunicator
   N_TURNED_UNHEALTHY,
   N_TURNED_HEALTHY,
-  N_NODE_COUNT_UPDATED,
+  N_NODE_COUNT_UPDATED, // for blacklisting.
   
   //Producer: AMNodeManager
   N_BLACKLISTING_ENABLED,

Modified: 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java?rev=1398523&r1=1398522&r2=1398523&view=diff
==============================================================================
--- 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java
 (original)
+++ 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java
 Mon Oct 15 21:09:59 2012
@@ -259,6 +259,7 @@ public class AMNodeImpl implements AMNod
               AMNodeEventType.N_NODE_WAS_BLACKLISTED));
           return AMNodeState.BLACKLISTED;
           // TODO XXX: An event likely needs to go out to the scheduler.
+          // XXX Someone needs to update the scheduler tables - send a ZEROd 
request to the scheduler. Who's doing that ?
         }
       }
       return AMNodeState.ACTIVE;
@@ -378,6 +379,7 @@ public class AMNodeImpl implements AMNod
       LOG.info("Node: " + node.getNodeId()
           + " got allocated a contaienr with id: " + event.getContainerId()
           + " while in UNHEALTHY state. Releasing it.");
+      // TODO XXX: Maybe consider including some diagnostics with this event. 
(RM reported NODE as unhealthy maybe). Which would then be included in 
diagnostics from the Container.
       node.sendEvent(new AMContainerEvent(event.getContainerId(),
           AMContainerEventType.C_NODE_FAILED));
     }

Modified: 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java?rev=1398523&r1=1398522&r2=1398523&view=diff
==============================================================================
--- 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
 (original)
+++ 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
 Mon Oct 15 21:09:59 2012
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
-import static junit.framework.Assert.assertFalse;
 import static junit.framework.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
@@ -51,7 +50,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.junit.Test;
 import org.mockito.Mockito;
-import org.mockito.verification.VerificationMode;
 
 public class TestJobHistoryEventHandler {
 

Modified: 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java?rev=1398523&r1=1398522&r2=1398523&view=diff
==============================================================================
--- 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java
 (original)
+++ 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java
 Mon Oct 15 21:09:59 2012
@@ -72,8 +72,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
-import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTAStopRequestEvent;
-import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTASucceededEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventTAEnded;
 import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestor;
 import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
@@ -261,7 +260,8 @@ public class MRApp extends MRAppMaster {
     TaskAttemptReport report = attempt.getReport();
     while (!finalState.equals(report.getTaskAttemptState()) &&
         timeoutSecs++ < 20) {
-      System.out.println("TaskAttempt State is : " + 
report.getTaskAttemptState() +
+      System.out.println("TaskAttempt State for " + attempt.getID() + " is : " 
+ 
+          report.getTaskAttemptState() +
           " Waiting for state : " + finalState +
           "   progress : " + report.getProgress());
       report = attempt.getReport();
@@ -651,24 +651,27 @@ public class MRApp extends MRAppMaster {
                 .getRemoteTask()));
 
         break;
-      case S_TA_STOP_REQUEST:
+      case S_TA_ENDED:
         // Send out a Container_stop_request.
-        AMSchedulerTAStopRequestEvent stEvent = 
(AMSchedulerTAStopRequestEvent) rawEvent;
-        LOG.info("XXX: Handling S_TA_STOP_REQUEST for attemptId:" + 
stEvent.getAttemptID());
-        getContext().getEventHandler().handle(
-            new AMContainerEvent(attemptToContainerIdMap.get(stEvent
-                .getAttemptID()), AMContainerEventType.C_STOP_REQUEST));
-
-        break;
-      case S_TA_SUCCEEDED:
-        // No re-use in MRApp. Stop the container.
-        AMSchedulerTASucceededEvent suEvent = (AMSchedulerTASucceededEvent) 
rawEvent;
-        LOG.info("XXX: Handling S_TA_SUCCEEDED for attemptId: "
-            + suEvent.getAttemptID());
-        getContext().getEventHandler().handle(
-            new AMContainerEvent(attemptToContainerIdMap.get(suEvent
-                .getAttemptID()), AMContainerEventType.C_STOP_REQUEST));
-        break;
+        AMSchedulerEventTAEnded sEvent = (AMSchedulerEventTAEnded) rawEvent;
+        LOG.info("XXX: Handling S_TA_ENDED for attemptId:"
+            + sEvent.getAttemptID() + " with state: " + sEvent.getState());
+        switch (sEvent.getState()) {
+        case FAILED:
+        case KILLED:
+          getContext().getEventHandler().handle(
+              new AMContainerEvent(attemptToContainerIdMap.get(sEvent
+                  .getAttemptID()), AMContainerEventType.C_STOP_REQUEST));
+          break;
+        case SUCCEEDED:
+          // No re-use in MRApp. Stop the container.
+          getContext().getEventHandler().handle(
+              new AMContainerEvent(attemptToContainerIdMap.get(sEvent
+                  .getAttemptID()), AMContainerEventType.C_STOP_REQUEST));
+          break;
+        default:
+          throw new YarnException("Unexpected state: " + sEvent.getState());
+        }
       case S_CONTAINERS_ALLOCATED:
         break;
       case S_CONTAINER_COMPLETED:

Modified: 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFail.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFail.java?rev=1398523&r1=1398522&r2=1398523&view=diff
==============================================================================
--- 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFail.java
 (original)
+++ 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFail.java
 Mon Oct 15 21:09:59 2012
@@ -195,7 +195,7 @@ public class TestFail {
     // TODO XXX: This may not be a valid test.
     app.getDispatcher().getEventHandler().handle(
         new TaskAttemptEvent(attempt.getID(),
-            TaskAttemptEventType.TA_TERMINATED));
+            TaskAttemptEventType.TA_CONTAINER_TERMINATED));
     app.waitForState(job, JobState.FAILED);
   }
 

Modified: 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestMapReduceChildJVM.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestMapReduceChildJVM.java?rev=1398523&r1=1398522&r2=1398523&view=diff
==============================================================================
--- 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestMapReduceChildJVM.java
 (original)
+++ 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestMapReduceChildJVM.java
 Mon Oct 15 21:09:59 2012
@@ -58,7 +58,8 @@ public class TestMapReduceChildJVM {
       " -Dhadoop.root.logger=INFO,CLA" +
       " org.apache.hadoop.mapred.YarnChild2 127.0.0.1" +
       " 54321" +
-      " attempt_0_0000_m_000000_0" +
+      " job_0_0000" +
+      " MAP" +
       " 0" +
       " 1><LOG_DIR>/stdout" +
       " 2><LOG_DIR>/stderr ]", app.myCommandLine);

Modified: 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java?rev=1398523&r1=1398522&r2=1398523&view=diff
==============================================================================
--- 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java
 (original)
+++ 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java
 Mon Oct 15 21:09:59 2012
@@ -45,6 +45,7 @@ import junit.framework.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
@@ -815,6 +816,12 @@ public class TestRMContainerAllocator {
       super.handleEvent(event);
     }
     
+    @Override
+    protected boolean shouldProfileTaskAttempt(JobConf conf,
+        org.apache.hadoop.mapred.Task remoteTask) {
+      return false;
+    }
+    
     static Priority getMapPriority() {
       return BuilderUtils.newPriority(PRIORITY_MAP.getPriority());
     }
@@ -845,6 +852,12 @@ public class TestRMContainerAllocator {
         int numPendingReduces, float maxReduceRampupLimit, float 
reduceSlowStart) {
       recalculatedReduceSchedule = true;
     }
+    
+    @Override
+    protected boolean shouldProfileTaskAttempt(JobConf conf,
+        org.apache.hadoop.mapred.Task remoteTask) {
+      return false;
+    }
   }
 
   class TrackingAMContainerRequestor extends RMContainerRequestor {
@@ -928,7 +941,7 @@ public class TestRMContainerAllocator {
     
     @Override
     public void handle(Event event) {
-      if (event.getType() == AMContainerEventType.C_START_REQUEST) {
+      if (event.getType() == AMContainerEventType.C_LAUNCH_REQUEST) {
         launchRequests.add((AMContainerLaunchRequestEvent)event);
       } else if (event.getType() == AMContainerEventType.C_ASSIGN_TA) {
         assignEvents.add((AMContainerAssignTAEvent)event);
@@ -960,6 +973,7 @@ public class TestRMContainerAllocator {
     Job mockJob = mock(Job.class);
     when(mockJob.getID()).thenReturn(jobId);
     when(mockJob.getProgress()).thenReturn(0.0f);
+    when(mockJob.getConf()).thenReturn(conf);
 
     Clock clock = new ControlledClock(new SystemClock());
 


Reply via email to