Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java?rev=1398523&r1=1398522&r2=1398523&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java Mon Oct 15 21:09:59 2012 @@ -39,9 +39,9 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app2.AppContext; import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener; -import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptDiagnosticsUpdateEvent; -import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventKillRequest; -import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventTerminated; +import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventContainerTerminated; +import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventContainerTerminating; +import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventNodeFailed; import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventContainerCompleted; import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorLaunchRequestEvent; import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorStopRequestEvent; @@ -61,10 +61,9 @@ import org.apache.hadoop.yarn.state.Stat public class AMContainerImpl implements AMContainer { private static final Log LOG = LogFactory.getLog(AMContainerImpl.class); - + private final ReadLock readLock; private final WriteLock writeLock; - // TODO Use ContainerId or a custom JvmId. private final ContainerId containerId; // Container to be used for getters on capability, locality etc. private final Container container; @@ -88,7 +87,7 @@ public class AMContainerImpl implements private TaskAttemptId pendingAttempt; private TaskAttemptId runningAttempt; - private TaskAttemptId interruptedEvent; + private List<TaskAttemptId> failedAssignments; private TaskAttemptId pullAttempt; private boolean inError = false; @@ -109,53 +108,59 @@ public class AMContainerImpl implements private void initStateMachineFactory() { stateMachineFactory = stateMachineFactory - .addTransition(AMContainerState.ALLOCATED, AMContainerState.LAUNCHING, AMContainerEventType.C_START_REQUEST, createLaunchRequestTransition()) + .addTransition(AMContainerState.ALLOCATED, AMContainerState.LAUNCHING, AMContainerEventType.C_LAUNCH_REQUEST, createLaunchRequestTransition()) .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_ASSIGN_TA, createAssignTaskAttemptAtAllocatedTransition()) .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, createCompletedAtAllocatedTransition()) .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_STOP_REQUEST, createStopRequestTransition()) .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_NODE_FAILED, createNodeFailedAtAllocatedTransition()) - .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_FAILED, AMContainerEventType.C_TIMED_OUT), createGenericErrorTransition()) + .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT), createGenericErrorTransition()) - .addTransition(AMContainerState.LAUNCHING, EnumSet.of(AMContainerState.LAUNCHING, AMContainerState.STOPPING), AMContainerEventType.C_ASSIGN_TA, createAssignTaskAttemptTransition()) + .addTransition(AMContainerState.LAUNCHING, EnumSet.of(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED), AMContainerEventType.C_ASSIGN_TA, createAssignTaskAttemptTransition()) .addTransition(AMContainerState.LAUNCHING, AMContainerState.IDLE, AMContainerEventType.C_LAUNCHED, createLaunchedTransition()) .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, AMContainerEventType.C_LAUNCH_FAILED, createLaunchFailedTransition()) .addTransition(AMContainerState.LAUNCHING, AMContainerState.LAUNCHING, AMContainerEventType.C_PULL_TA) // Is assuming the pullAttempt will be null. .addTransition(AMContainerState.LAUNCHING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, createCompletedAtLaunchingTransition()) - .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, AMContainerEventType.C_STOP_REQUEST, createStopRequestAtLaunchingTransition()) + .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, createStopRequestAtLaunchingTransition()) .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, createNodeFailedAtLaunchingTransition()) - .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, EnumSet.of(AMContainerEventType.C_START_REQUEST, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_FAILED, AMContainerEventType.C_TIMED_OUT), createGenericErrorAtLaunchingTransition()) - - - .addTransition(AMContainerState.IDLE, EnumSet.of(AMContainerState.IDLE, AMContainerState.STOPPING), AMContainerEventType.C_ASSIGN_TA, createAssignTaskAttemptAtIdleTransition()) + .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT), createGenericErrorAtLaunchingTransition()) + + .addTransition(AMContainerState.IDLE, EnumSet.of(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED), AMContainerEventType.C_ASSIGN_TA, createAssignTaskAttemptAtIdleTransition()) .addTransition(AMContainerState.IDLE, EnumSet.of(AMContainerState.RUNNING, AMContainerState.IDLE), AMContainerEventType.C_PULL_TA, createPullTAAtIdleTransition()) .addTransition(AMContainerState.IDLE, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, createCompletedAtIdleTransition()) - .addTransition(AMContainerState.IDLE, AMContainerState.STOPPING, AMContainerEventType.C_STOP_REQUEST, createStopRequestAtIdleTransition()) - .addTransition(AMContainerState.IDLE, AMContainerState.STOPPING, AMContainerEventType.C_TIMED_OUT, createTimedOutAtIdleTransition()) + .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, createStopRequestAtIdleTransition()) + .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_TIMED_OUT, createTimedOutAtIdleTransition()) .addTransition(AMContainerState.IDLE, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, createNodeFailedAtIdleTransition()) - .addTransition(AMContainerState.IDLE, AMContainerState.STOPPING, EnumSet.of(AMContainerEventType.C_START_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_FAILED), createGenericErrorAtIdleTransition()) + .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED), createGenericErrorAtIdleTransition()) - .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, AMContainerEventType.C_ASSIGN_TA, createAssignTaskAttemptAtRunningTransition()) + .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA, createAssignTaskAttemptAtRunningTransition()) .addTransition(AMContainerState.RUNNING, AMContainerState.RUNNING, AMContainerEventType.C_PULL_TA) .addTransition(AMContainerState.RUNNING, AMContainerState.IDLE, AMContainerEventType.C_TA_SUCCEEDED, createTASucceededAtRunningTransition()) .addTransition(AMContainerState.RUNNING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, createCompletedAtRunningTransition()) - .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, AMContainerEventType.C_STOP_REQUEST, createStopRequestAtRunningTransition()) - .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, AMContainerEventType.C_TIMED_OUT, createTimedOutAtRunningTransition()) + .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, createStopRequestAtRunningTransition()) + .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_TIMED_OUT, createTimedOutAtRunningTransition()) .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, createNodeFailedAtRunningTransition()) - .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, EnumSet.of(AMContainerEventType.C_START_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_STOP_FAILED), createGenericErrorAtRunningTransition()) + .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED), createGenericErrorAtRunningTransition()) + .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA, createAssignTAAtStoppingTransition()) + .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, createCompletedAtStoppingTransition()) + .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NM_STOP_SENT) + .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NM_STOP_FAILED, createStopFailedAtNMStopRequested()) // TODO XXX: Rename these. + .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, createNodeFailedAtNMStopRequestedTransition()) + .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_TIMED_OUT)) + .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_LAUNCH_REQUEST, createGenericErrorAtStoppingTransition()) .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_ASSIGN_TA, createAssignTAAtStoppingTransition()) .addTransition(AMContainerState.STOPPING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, createCompletedAtStoppingTransition()) - .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, createNodeFailedBaseTransition()) - .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_STOP_FAILED, AMContainerEventType.C_TIMED_OUT)) - .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_START_REQUEST, createGenericErrorAtStoppingTransition()) + .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, createNodeFailedAtStoppingTransition()) + .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT)) + .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_LAUNCH_REQUEST, createGenericErrorAtStoppingTransition()) .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_ASSIGN_TA, createAssignTAAtCompletedTransition()) - .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_NODE_FAILED, createNodeFailedBaseTransition()) - .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_START_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_STOP_FAILED), createGenericErrorAtStoppingTransition()) - .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_COMPLETED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_COMPLETED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_TIMED_OUT)) - + .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_NODE_FAILED, createNodeFailedAtCompletedTransition()) + .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT)) + .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST), createGenericErrorAtStoppingTransition()) + .installTopology(); } @@ -335,8 +340,8 @@ public class AMContainerImpl implements AMContainerAssignTAEvent event = (AMContainerAssignTAEvent) cEvent; container.inError = true; container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(), - "AMScheduler Error: TaskAttempt should not be" + - " allocated before a launch request."); + "AMScheduler Error: TaskAttempt allocated to unlaunched container: " + + container.getContainerId()); container.sendCompletedToScheduler(); container.deAllocate(); LOG.warn("Unexpected TA Assignment: TAId: " + event.getTaskAttemptId() @@ -387,6 +392,10 @@ public class AMContainerImpl implements } } + protected void registerFailedTAAssignment(TaskAttemptId taId) { + failedAssignments.add(taId); + } + protected void deAllocate() { sendEvent(new RMCommunicatorContainerDeAllocateRequestEvent(containerId)); } @@ -396,15 +405,17 @@ public class AMContainerImpl implements } protected void sendTerminatedToTaskAttempt(TaskAttemptId taId, String message) { - if (message != null) { - sendEvent(new TaskAttemptDiagnosticsUpdateEvent(taId, message)); - } - sendEvent(new TaskAttemptEventTerminated(taId)); + sendEvent(new TaskAttemptEventContainerTerminated(taId, message)); } - protected void sendKillRequestToTaskAttempt(TaskAttemptId taId) { - sendEvent(new TaskAttemptEventKillRequest(taId, - "Node running the contianer failed")); + protected void sendTerminatingToTA(TaskAttemptId taId, String message) { + sendEvent(new TaskAttemptEventContainerTerminating(taId, message)); + } + + protected void sendNodeFailureToTA(AMContainerEvent event, + TaskAttemptId taId, String message) { + sendEvent(new TaskAttemptEventNodeFailed(taId, message)); + // TODO XXX: Diag message from the node. Otherwise include the nodeId } protected void sendStopRequestToNM() { @@ -439,11 +450,14 @@ public class AMContainerImpl implements container.inError = true; String errorMessage = "AMScheduler Error: Multiple simultaneous " + "taskAttempt allocations to: " + container.getContainerId(); - container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(), - errorMessage); - container.deAllocate(); + container.sendTerminatingToTA(event.getTaskAttemptId(), errorMessage); + container.registerFailedTAAssignment(event.getTaskAttemptId()); + // TODO XXX: Verify that it's ok to send in a NM_STOP_REQUEST. The + // NMCommunicator should be able to handle this. The STOP_REQUEST would + // only go out after the START_REQUEST. LOG.warn(errorMessage); - return AMContainerState.STOPPING; + container.sendStopRequestToNM(); + return AMContainerState.STOP_REQUESTED; } container.pendingAttempt = event.getTaskAttemptId(); container.remoteTaskMap.put(event.getTaskAttemptId(), @@ -490,7 +504,7 @@ public class AMContainerImpl implements container.pendingAttempt = null; if (container.lastTaskFinishTime != 0) { long idleTimeDiff = System.currentTimeMillis() - container.lastTaskFinishTime; - LOG.info("Computing idle time for container: " + container.getContainerId() + ", lastFinishTime: " + container.lastTaskFinishTime + ", Incremented by: " + idleTimeDiff); + LOG.info("XXX: Computing idle time for container: " + container.getContainerId() + ", lastFinishTime: " + container.lastTaskFinishTime + ", Incremented by: " + idleTimeDiff); container.idleTimeBetweenTasks += System.currentTimeMillis() - container.lastTaskFinishTime; } LOG.info("XXX: Assigned task + [" + container.runningAttempt + "] to container: [" + container.getContainerId() + "]"); @@ -512,8 +526,8 @@ public class AMContainerImpl implements public void transition(AMContainerImpl container, AMContainerEvent cEvent) { if (container.pendingAttempt != null) { AMContainerEventLaunchFailed event = (AMContainerEventLaunchFailed) cEvent; - container.sendEvent(new TaskAttemptDiagnosticsUpdateEvent( - container.pendingAttempt, event.getMessage())); + container.sendTerminatingToTA(container.pendingAttempt, + event.getMessage()); } container.deAllocate(); } @@ -531,7 +545,8 @@ public class AMContainerImpl implements if (container.pendingAttempt != null) { String errorMessage = "Container" + container.getContainerId() + " failed. Received COMPLETED event while trying to launch"; - container.sendTerminatedToTaskAttempt(container.pendingAttempt,errorMessage); + container.sendTerminatedToTaskAttempt(container.pendingAttempt, + errorMessage); LOG.warn(errorMessage); // TODO XXX Maybe nullify pendingAttempt. } @@ -548,11 +563,14 @@ public class AMContainerImpl implements SingleArcTransition<AMContainerImpl, AMContainerEvent> { @Override public void transition(AMContainerImpl container, AMContainerEvent cEvent) { + if (container.pendingAttempt != null) { + container.sendTerminatingToTA(container.pendingAttempt, + " Container" + container.getContainerId() + " received a STOP_REQUEST"); + } container.sendStopRequestToNM(); - container.deAllocate(); } } - + protected SingleArcTransition<AMContainerImpl, AMContainerEvent> createNodeFailedAtLaunchingTransition() { return new NodeFailedAtLaunching(); @@ -563,7 +581,10 @@ public class AMContainerImpl implements @Override public void transition(AMContainerImpl container, AMContainerEvent cEvent) { if (container.pendingAttempt != null) { - container.sendKillRequestToTaskAttempt(container.pendingAttempt); + container.sendNodeFailureToTA(cEvent, container.pendingAttempt, null); + // TODO XXX: Maybe include a diagnostic message along with the incoming + // Node failure event. + container.sendTerminatingToTA(container.pendingAttempt, "Node failure"); } container.sendStopRequestToNM(); container.deAllocate(); @@ -575,7 +596,7 @@ public class AMContainerImpl implements return new AssignTaskAttemptAtIdle(); } - // TODO Make this the base for all assignRequests. Some more error checking in + // TODO XXX Make this the base for all assignRequests. Some more error checking in // that case. protected static class AssignTaskAttemptAtIdle implements @@ -588,17 +609,16 @@ public class AMContainerImpl implements container.inError = true; String errorMessage = "AMScheduler Error: Multiple simultaneous " + "taskAttempt allocations to: " + container.getContainerId(); - container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(), - errorMessage); + container.sendTerminatingToTA(event.getTaskAttemptId(), errorMessage); + container.registerFailedTAAssignment(event.getTaskAttemptId()); LOG.warn(errorMessage); container.sendStopRequestToNM(); - container.deAllocate(); container.containerHeartbeatHandler.unregister(container.containerId); - return AMContainerState.STOPPING; + return AMContainerState.STOP_REQUESTED; } container.pendingAttempt = event.getTaskAttemptId(); - // TODO LATER. Cleanup the remoteTaskMap. + // TODO XXX LATER. Cleanup the remoteTaskMap. container.remoteTaskMap.put(event.getTaskAttemptId(), event.getRemoteTask()); return AMContainerState.IDLE; @@ -617,10 +637,12 @@ public class AMContainerImpl implements LOG.info("Cotnainer with id: " + container.getContainerId() + " Completed." + " Previous state was: " + container.getState()); if (container.pendingAttempt != null) { - container.sendTerminatedToTaskAttempt(container.pendingAttempt, null); + container.sendTerminatedToTaskAttempt(container.pendingAttempt, + "Container " + container.getContainerId() + " FINISHED."); } container.sendCompletedToScheduler(); container.containerHeartbeatHandler.unregister(container.containerId); + container.unregisterJvmFromListener(container.jvmId); } } @@ -629,16 +651,13 @@ public class AMContainerImpl implements return new StopRequestAtIdle(); } - protected static class StopRequestAtIdle implements - SingleArcTransition<AMContainerImpl, AMContainerEvent> { + protected static class StopRequestAtIdle extends StopRequestAtLaunching { @Override public void transition(AMContainerImpl container, AMContainerEvent cEvent) { + super.transition(container, cEvent); LOG.info("XXX: IdleTimeBetweenTasks: " + container.idleTimeBetweenTasks); - container.sendStopRequestToNM(); - container.deAllocate(); container.containerHeartbeatHandler.unregister(container.containerId); container.unregisterJvmFromListener(container.jvmId); - // TODO XXXXXXXXX: Unregister from TAL so that the Container kills itself (via a kill task assignment) } } @@ -648,6 +667,7 @@ public class AMContainerImpl implements } protected static class TimedOutAtIdle extends StopRequestAtIdle { + // TODO XXX: Override to change the diagnostic message that goes to the TaskAttempt. Functionality is the same. } protected SingleArcTransition<AMContainerImpl, AMContainerEvent> @@ -675,15 +695,13 @@ public class AMContainerImpl implements SingleArcTransition<AMContainerImpl, AMContainerEvent> { @Override public void transition(AMContainerImpl container, AMContainerEvent cEvent) { - container.sendTerminatedToTaskAttempt(container.runningAttempt, null); + container.sendTerminatedToTaskAttempt(container.runningAttempt, + "Container " + container.getContainerId() + + " FINISHED while task was running"); container.sendCompletedToScheduler(); container.containerHeartbeatHandler.unregister(container.containerId); container.unregisterAttemptFromListener(container.runningAttempt); container.unregisterJvmFromListener(container.jvmId); - container.interruptedEvent = container.runningAttempt; - container.runningAttempt = null; - - } } @@ -696,10 +714,9 @@ public class AMContainerImpl implements public void transition(AMContainerImpl container, AMContainerEvent cEvent) { super.transition(container, cEvent); container.unregisterAttemptFromListener(container.runningAttempt); -// container.unregisterJvmFromListener(container.jvmId); + container.sendTerminatingToTA(container.runningAttempt, + " Container" + container.getContainerId() + " received a STOP_REQUEST"); // TODO XXX: All running transition. verify whether runningAttempt should be null. - container.interruptedEvent = container.runningAttempt; - container.runningAttempt = null; } } @@ -709,6 +726,7 @@ public class AMContainerImpl implements } protected static class TimedOutAtRunning extends StopRequestAtRunning { + // TODO XXX: Change the error message. } protected SingleArcTransition<AMContainerImpl, AMContainerEvent> @@ -721,12 +739,10 @@ public class AMContainerImpl implements @Override public void transition(AMContainerImpl container, AMContainerEvent cEvent) { super.transition(container, cEvent); - container.sendKillRequestToTaskAttempt(container.runningAttempt); + container.sendNodeFailureToTA(cEvent, container.runningAttempt, null); + container.sendTerminatingToTA(container.runningAttempt, "Node failure"); + container.unregisterAttemptFromListener(container.runningAttempt); - container.unregisterJvmFromListener(container.jvmId); - container.interruptedEvent = container.runningAttempt; - container.runningAttempt = null; - } } @@ -744,9 +760,9 @@ public class AMContainerImpl implements container.inError = true; String errorMessage = "AttemptId: " + event.getTaskAttemptId() + " cannot be allocated to container: " + container.getContainerId() - + " in STOPPING state"; - container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(), - errorMessage); + + " in " + container.getState() + " state"; + container.sendTerminatingToTA(event.getTaskAttemptId(), errorMessage); + container.registerFailedTAAssignment(event.getTaskAttemptId()); } } @@ -761,6 +777,7 @@ public class AMContainerImpl implements @Override public void transition(AMContainerImpl container, AMContainerEvent cEvent) { container.inError = true; + // TODO XXX: Anything else required in the error transitions ? } } @@ -791,22 +808,32 @@ public class AMContainerImpl implements SingleArcTransition<AMContainerImpl, AMContainerEvent> { @Override public void transition(AMContainerImpl container, AMContainerEvent cEvent) { - // XXX: Would some of these events not have gone out when entering the STOPPING state. Fix errorMessages + // TODO XXX: Set everything to null after sending these out. if (container.pendingAttempt != null) { container.sendTerminatedToTaskAttempt(container.pendingAttempt, null); } if (container.runningAttempt != null) { container.sendTerminatedToTaskAttempt(container.runningAttempt, null); } - if (container.interruptedEvent != null) { - container.sendTerminatedToTaskAttempt(container.interruptedEvent, null); - } container.sendCompletedToScheduler(); } } + protected SingleArcTransition<AMContainerImpl, AMContainerEvent> + createStopFailedAtNMStopRequested() { + return new StopFailedAtNMStopRequested(); + } + + protected static class StopFailedAtNMStopRequested implements + SingleArcTransition<AMContainerImpl, AMContainerEvent> { + @Override + public void transition(AMContainerImpl container, AMContainerEvent cEvent) { + container.deAllocate(); + } + } - protected SingleArcTransition<AMContainerImpl, AMContainerEvent> createNodeFailedBaseTransition() { + protected SingleArcTransition<AMContainerImpl, AMContainerEvent> + createNodeFailedBaseTransition() { return new NodeFailedBase(); } @@ -820,43 +847,96 @@ public class AMContainerImpl implements // let multiple events go out and the TA should be able to handle them. // Kill_TA going out in this case. if (container.runningAttempt != null) { - container.killTaskAttempt(container.runningAttempt); + container.sendNodeFailureToTA(cEvent, container.runningAttempt, null); + container.sendTerminatingToTA(container.runningAttempt, "Node Failure"); } if (container.pendingAttempt != null) { - container.killTaskAttempt(container.pendingAttempt); + container.sendNodeFailureToTA(cEvent, container.pendingAttempt, null); } for (TaskAttemptId attemptId : container.completedAttempts) { // TODO XXX: Make sure TaskAttempt knows how to handle kills to REDUCEs. -// if (attemptId.getTaskId().getTaskType() == TaskType.MAP) { - container.killTaskAttempt(attemptId); -// }s + container.sendNodeFailureToTA(cEvent, attemptId, null); } - } } - private void killTaskAttempt(TaskAttemptId attemptId) { - sendEvent(new TaskAttemptEventKillRequest(attemptId, "The node running the task attempt was marked as bad")); + protected SingleArcTransition<AMContainerImpl, AMContainerEvent> + createNodeFailedAtStoppingTransition() { + return new NodeFailedAtSopping(); } + protected static class NodeFailedAtSopping extends NodeFailedBase { + public void transition(AMContainerImpl container, AMContainerEvent cEvent) { + super.transition(container, cEvent); + if (container.runningAttempt != null) { + container.sendTerminatingToTA(container.runningAttempt, "Node Failure"); + } + } + } + + protected SingleArcTransition<AMContainerImpl, AMContainerEvent> + createNodeFailedAtCompletedTransition() { + return new NodeFailedAtCompleted(); + } + + protected static class NodeFailedAtCompleted extends NodeFailedBase { + public void transition(AMContainerImpl container, AMContainerEvent cEvent) { + super.transition(container, cEvent); + if (container.runningAttempt != null) { + container.sendTerminatedToTaskAttempt(container.runningAttempt, + "Node Failure"); + } + } + } + + protected SingleArcTransition<AMContainerImpl, AMContainerEvent> createNodeFailedAtNMStopRequestedTransition() { + return new NodeFailedAtNMStopRequested(); + } + + protected static class NodeFailedAtNMStopRequested implements + SingleArcTransition<AMContainerImpl, AMContainerEvent> { + public void transition(AMContainerImpl container, AMContainerEvent cEvent) { + if (container.runningAttempt != null) { + container.sendNodeFailureToTA(cEvent, container.runningAttempt, + null); + container.sendTerminatingToTA(container.runningAttempt, "Node Failure"); + } + if (container.pendingAttempt != null) { + container.sendNodeFailureToTA(cEvent, container.pendingAttempt, + null); + } + for (TaskAttemptId attemptId : container.completedAttempts) { + // TODO XXX: Make sure TaskAttempt knows how to handle kills to REDUCEs. + container.sendNodeFailureToTA(cEvent, attemptId, null); + } + for (TaskAttemptId attemptId : container.failedAssignments) { + container.sendNodeFailureToTA(cEvent, attemptId, null); + } + container.deAllocate(); + } + } + protected SingleArcTransition<AMContainerImpl, AMContainerEvent> createNodeFailedAtIdleTransition() { return new NodeFailedAtIdle(); } - - protected static class NodeFailedAtIdle implements SingleArcTransition<AMContainerImpl, AMContainerEvent> { - + + protected static class NodeFailedAtIdle implements + SingleArcTransition<AMContainerImpl, AMContainerEvent> { + @Override public void transition(AMContainerImpl container, AMContainerEvent cEvent) { container.sendStopRequestToNM(); container.deAllocate(); if (container.pendingAttempt != null) { - container.sendKillRequestToTaskAttempt(container.pendingAttempt); + container.sendNodeFailureToTA(cEvent, container.pendingAttempt, null); + container.sendTerminatingToTA(container.pendingAttempt, "Node Failure"); } for (TaskAttemptId taId : container.completedAttempts) { - container.sendKillRequestToTaskAttempt(taId); + container.sendNodeFailureToTA(cEvent, taId, null); } container.containerHeartbeatHandler.unregister(container.containerId); + container.unregisterJvmFromListener(container.jvmId); } } @@ -873,16 +953,18 @@ public class AMContainerImpl implements container.inError = true; String errorMessage = "AttemptId: " + event.getTaskAttemptId() + " cannot be allocated to container: " + container.getContainerId() - + " in RUNNING state"; - container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(), errorMessage); + + " in RUNNING state. Already executing TaskAttempt: " + + container.runningAttempt; + container.sendTerminatingToTA(event.getTaskAttemptId(), errorMessage); + container.registerFailedTAAssignment(event.getTaskAttemptId()); + + container.sendTerminatingToTA(container.runningAttempt, errorMessage); + container.sendStopRequestToNM(); - container.deAllocate(); container.unregisterAttemptFromListener(container.runningAttempt); container.unregisterJvmFromListener(container.jvmId); container.containerHeartbeatHandler.unregister(container.containerId); - container.interruptedEvent = container.runningAttempt; - container.runningAttempt = null; - // TODO XXX: Is the TAL unregister required ? + } } @@ -926,6 +1008,7 @@ public class AMContainerImpl implements public void transition(AMContainerImpl container, AMContainerEvent cEvent) { super.transition(container, cEvent); container.containerHeartbeatHandler.unregister(container.containerId); + container.unregisterJvmFromListener(container.jvmId); } } @@ -939,12 +1022,11 @@ public class AMContainerImpl implements super.transition(container, cEvent); container.unregisterAttemptFromListener(container.runningAttempt); container.unregisterJvmFromListener(container.jvmId); - container.interruptedEvent = container.runningAttempt; - container.runningAttempt = null; } } // TODO Create a generic ERROR state. Container tries informing relevant components in this case. + // TODO XXX: Rename all generic error transitions. }
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerLaunchRequestEvent.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerLaunchRequestEvent.java?rev=1398523&r1=1398522&r2=1398523&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerLaunchRequestEvent.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerLaunchRequestEvent.java Mon Oct 15 21:09:59 2012 @@ -38,7 +38,7 @@ public class AMContainerLaunchRequestEve public AMContainerLaunchRequestEvent(ContainerId containerId, JobId jobId, TaskType taskType, Token<JobTokenIdentifier> jobToken, Credentials credentials, boolean shouldProfile, JobConf jobConf) { - super(containerId, AMContainerEventType.C_START_REQUEST); + super(containerId, AMContainerEventType.C_LAUNCH_REQUEST); this.jobId = jobId; this.taskTypeForContainer = taskType; this.jobToken = jobToken; Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerState.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerState.java?rev=1398523&r1=1398522&r2=1398523&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerState.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerState.java Mon Oct 15 21:09:59 2012 @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + package org.apache.hadoop.mapreduce.v2.app2.rm.container; public enum AMContainerState { @@ -5,6 +22,9 @@ public enum AMContainerState { LAUNCHING, IDLE, RUNNING, + // indicates a NM stop request has been attempted. This request could fail, in + // which case an RM stop request needs to be sent. + STOP_REQUESTED, STOPPING, COMPLETED, } Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventTaskAttemptEnded.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventTaskAttemptEnded.java?rev=1398523&r1=1398522&r2=1398523&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventTaskAttemptEnded.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventTaskAttemptEnded.java Mon Oct 15 21:09:59 2012 @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + package org.apache.hadoop.mapreduce.v2.app2.rm.node; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; @@ -11,7 +28,8 @@ public class AMNodeEventTaskAttemptEnded private final ContainerId containerId; private final TaskAttemptId taskAttemptId; - public AMNodeEventTaskAttemptEnded(NodeId nodeId, ContainerId containerId, TaskAttemptId taskAttemptId, boolean failed) { + public AMNodeEventTaskAttemptEnded(NodeId nodeId, ContainerId containerId, + TaskAttemptId taskAttemptId, boolean failed) { super(nodeId, AMNodeEventType.N_TA_ENDED); this.failed = failed; this.containerId = containerId; Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventType.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventType.java?rev=1398523&r1=1398522&r2=1398523&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventType.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventType.java Mon Oct 15 21:09:59 2012 @@ -29,7 +29,7 @@ public enum AMNodeEventType { //Producer: RMCommunicator N_TURNED_UNHEALTHY, N_TURNED_HEALTHY, - N_NODE_COUNT_UPDATED, + N_NODE_COUNT_UPDATED, // for blacklisting. //Producer: AMNodeManager N_BLACKLISTING_ENABLED, Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java?rev=1398523&r1=1398522&r2=1398523&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java Mon Oct 15 21:09:59 2012 @@ -259,6 +259,7 @@ public class AMNodeImpl implements AMNod AMNodeEventType.N_NODE_WAS_BLACKLISTED)); return AMNodeState.BLACKLISTED; // TODO XXX: An event likely needs to go out to the scheduler. + // XXX Someone needs to update the scheduler tables - send a ZEROd request to the scheduler. Who's doing that ? } } return AMNodeState.ACTIVE; @@ -378,6 +379,7 @@ public class AMNodeImpl implements AMNod LOG.info("Node: " + node.getNodeId() + " got allocated a contaienr with id: " + event.getContainerId() + " while in UNHEALTHY state. Releasing it."); + // TODO XXX: Maybe consider including some diagnostics with this event. (RM reported NODE as unhealthy maybe). Which would then be included in diagnostics from the Container. node.sendEvent(new AMContainerEvent(event.getContainerId(), AMContainerEventType.C_NODE_FAILED)); } Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java?rev=1398523&r1=1398522&r2=1398523&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java Mon Oct 15 21:09:59 2012 @@ -18,7 +18,6 @@ package org.apache.hadoop.mapreduce.jobhistory; -import static junit.framework.Assert.assertFalse; import static junit.framework.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -51,7 +50,6 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.Test; import org.mockito.Mockito; -import org.mockito.verification.VerificationMode; public class TestJobHistoryEventHandler { Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java?rev=1398523&r1=1398522&r2=1398523&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java Mon Oct 15 21:09:59 2012 @@ -72,8 +72,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEvent; import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent; -import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTAStopRequestEvent; -import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTASucceededEvent; +import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventTAEnded; import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestor; import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent; @@ -261,7 +260,8 @@ public class MRApp extends MRAppMaster { TaskAttemptReport report = attempt.getReport(); while (!finalState.equals(report.getTaskAttemptState()) && timeoutSecs++ < 20) { - System.out.println("TaskAttempt State is : " + report.getTaskAttemptState() + + System.out.println("TaskAttempt State for " + attempt.getID() + " is : " + + report.getTaskAttemptState() + " Waiting for state : " + finalState + " progress : " + report.getProgress()); report = attempt.getReport(); @@ -651,24 +651,27 @@ public class MRApp extends MRAppMaster { .getRemoteTask())); break; - case S_TA_STOP_REQUEST: + case S_TA_ENDED: // Send out a Container_stop_request. - AMSchedulerTAStopRequestEvent stEvent = (AMSchedulerTAStopRequestEvent) rawEvent; - LOG.info("XXX: Handling S_TA_STOP_REQUEST for attemptId:" + stEvent.getAttemptID()); - getContext().getEventHandler().handle( - new AMContainerEvent(attemptToContainerIdMap.get(stEvent - .getAttemptID()), AMContainerEventType.C_STOP_REQUEST)); - - break; - case S_TA_SUCCEEDED: - // No re-use in MRApp. Stop the container. - AMSchedulerTASucceededEvent suEvent = (AMSchedulerTASucceededEvent) rawEvent; - LOG.info("XXX: Handling S_TA_SUCCEEDED for attemptId: " - + suEvent.getAttemptID()); - getContext().getEventHandler().handle( - new AMContainerEvent(attemptToContainerIdMap.get(suEvent - .getAttemptID()), AMContainerEventType.C_STOP_REQUEST)); - break; + AMSchedulerEventTAEnded sEvent = (AMSchedulerEventTAEnded) rawEvent; + LOG.info("XXX: Handling S_TA_ENDED for attemptId:" + + sEvent.getAttemptID() + " with state: " + sEvent.getState()); + switch (sEvent.getState()) { + case FAILED: + case KILLED: + getContext().getEventHandler().handle( + new AMContainerEvent(attemptToContainerIdMap.get(sEvent + .getAttemptID()), AMContainerEventType.C_STOP_REQUEST)); + break; + case SUCCEEDED: + // No re-use in MRApp. Stop the container. + getContext().getEventHandler().handle( + new AMContainerEvent(attemptToContainerIdMap.get(sEvent + .getAttemptID()), AMContainerEventType.C_STOP_REQUEST)); + break; + default: + throw new YarnException("Unexpected state: " + sEvent.getState()); + } case S_CONTAINERS_ALLOCATED: break; case S_CONTAINER_COMPLETED: Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFail.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFail.java?rev=1398523&r1=1398522&r2=1398523&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFail.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFail.java Mon Oct 15 21:09:59 2012 @@ -195,7 +195,7 @@ public class TestFail { // TODO XXX: This may not be a valid test. app.getDispatcher().getEventHandler().handle( new TaskAttemptEvent(attempt.getID(), - TaskAttemptEventType.TA_TERMINATED)); + TaskAttemptEventType.TA_CONTAINER_TERMINATED)); app.waitForState(job, JobState.FAILED); } Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestMapReduceChildJVM.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestMapReduceChildJVM.java?rev=1398523&r1=1398522&r2=1398523&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestMapReduceChildJVM.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestMapReduceChildJVM.java Mon Oct 15 21:09:59 2012 @@ -58,7 +58,8 @@ public class TestMapReduceChildJVM { " -Dhadoop.root.logger=INFO,CLA" + " org.apache.hadoop.mapred.YarnChild2 127.0.0.1" + " 54321" + - " attempt_0_0000_m_000000_0" + + " job_0_0000" + + " MAP" + " 0" + " 1><LOG_DIR>/stdout" + " 2><LOG_DIR>/stderr ]", app.myCommandLine); Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java?rev=1398523&r1=1398522&r2=1398523&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java Mon Oct 15 21:09:59 2012 @@ -45,6 +45,7 @@ import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; @@ -815,6 +816,12 @@ public class TestRMContainerAllocator { super.handleEvent(event); } + @Override + protected boolean shouldProfileTaskAttempt(JobConf conf, + org.apache.hadoop.mapred.Task remoteTask) { + return false; + } + static Priority getMapPriority() { return BuilderUtils.newPriority(PRIORITY_MAP.getPriority()); } @@ -845,6 +852,12 @@ public class TestRMContainerAllocator { int numPendingReduces, float maxReduceRampupLimit, float reduceSlowStart) { recalculatedReduceSchedule = true; } + + @Override + protected boolean shouldProfileTaskAttempt(JobConf conf, + org.apache.hadoop.mapred.Task remoteTask) { + return false; + } } class TrackingAMContainerRequestor extends RMContainerRequestor { @@ -928,7 +941,7 @@ public class TestRMContainerAllocator { @Override public void handle(Event event) { - if (event.getType() == AMContainerEventType.C_START_REQUEST) { + if (event.getType() == AMContainerEventType.C_LAUNCH_REQUEST) { launchRequests.add((AMContainerLaunchRequestEvent)event); } else if (event.getType() == AMContainerEventType.C_ASSIGN_TA) { assignEvents.add((AMContainerAssignTAEvent)event); @@ -960,6 +973,7 @@ public class TestRMContainerAllocator { Job mockJob = mock(Job.class); when(mockJob.getID()).thenReturn(jobId); when(mockJob.getProgress()).thenReturn(0.0f); + when(mockJob.getConf()).thenReturn(conf); Clock clock = new ControlledClock(new SystemClock());