http://git-wip-us.apache.org/repos/asf/hadoop/blob/444836b3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index 1807c1c..79b88d8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -59,6 +59,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.ClusterInfo; import org.apache.hadoop.mapreduce.v2.app.MRApp; +import org.apache.hadoop.mapreduce.v2.app.TaskAttemptFinishingMonitor; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; @@ -407,6 +408,7 @@ public class TestTaskAttempt{ Resource resource = mock(Resource.class); when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(resource.getMemory()).thenReturn(1024); + setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, @@ -464,6 +466,7 @@ public class TestTaskAttempt{ Resource resource = mock(Resource.class); when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(resource.getMemory()).thenReturn(1024); + setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, @@ -524,6 +527,7 @@ public class TestTaskAttempt{ Resource resource = mock(Resource.class); when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(resource.getMemory()).thenReturn(1024); + setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, @@ -546,7 +550,7 @@ public class TestTaskAttempt{ taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_DONE)); taImpl.handle(new TaskAttemptEvent(attemptId, - TaskAttemptEventType.TA_CONTAINER_CLEANED)); + TaskAttemptEventType.TA_CONTAINER_COMPLETED)); assertEquals("Task attempt is not in succeeded state", taImpl.getState(), TaskAttemptState.SUCCEEDED); @@ -593,6 +597,7 @@ public class TestTaskAttempt{ Resource resource = mock(Resource.class); when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(resource.getMemory()).thenReturn(1024); + setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, splits, jobConf, taListener, @@ -641,6 +646,7 @@ public class TestTaskAttempt{ Resource resource = mock(Resource.class); when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(resource.getMemory()).thenReturn(1024); + setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, @@ -663,7 +669,7 @@ public class TestTaskAttempt{ taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_DONE)); taImpl.handle(new TaskAttemptEvent(attemptId, - TaskAttemptEventType.TA_CONTAINER_CLEANED)); + TaskAttemptEventType.TA_CONTAINER_COMPLETED)); assertEquals("Task attempt is not in succeeded state", taImpl.getState(), TaskAttemptState.SUCCEEDED); @@ -708,6 +714,7 @@ public class TestTaskAttempt{ Resource resource = mock(Resource.class); when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(resource.getMemory()).thenReturn(1024); + setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, splits, jobConf, taListener, @@ -753,6 +760,7 @@ public class TestTaskAttempt{ AppContext appCtx = mock(AppContext.class); ClusterInfo clusterInfo = mock(ClusterInfo.class); when(appCtx.getClusterInfo()).thenReturn(clusterInfo); + setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, @@ -774,7 +782,7 @@ public class TestTaskAttempt{ taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_DONE)); taImpl.handle(new TaskAttemptEvent(attemptId, - TaskAttemptEventType.TA_CONTAINER_CLEANED)); + TaskAttemptEventType.TA_CONTAINER_COMPLETED)); assertEquals("Task attempt is not in succeeded state", taImpl.getState(), TaskAttemptState.SUCCEEDED); @@ -967,6 +975,255 @@ public class TestTaskAttempt{ taImpl.getInternalState()); } + + @Test + public void testKillMapTaskWhileSuccessFinishing() throws Exception { + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_DONE)); + + assertEquals("Task attempt is not in SUCCEEDED state", taImpl.getState(), + TaskAttemptState.SUCCEEDED); + assertEquals("Task attempt's internal state is not " + + "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(), + TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER); + + // If the map task is killed when it is in SUCCESS_FINISHING_CONTAINER + // state, the state will move to KILL_CONTAINER_CLEANUP + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_KILL)); + assertEquals("Task attempt is not in KILLED state", taImpl.getState(), + TaskAttemptState.KILLED); + assertEquals("Task attempt's internal state is not KILL_CONTAINER_CLEANUP", + taImpl.getInternalState(), + TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_CONTAINER_CLEANED)); + assertEquals("Task attempt's internal state is not KILL_TASK_CLEANUP", + taImpl.getInternalState(), + TaskAttemptStateInternal.KILL_TASK_CLEANUP); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_CLEANUP_DONE)); + + assertEquals("Task attempt is not in KILLED state", taImpl.getState(), + TaskAttemptState.KILLED); + + assertFalse("InternalError occurred", eventHandler.internalError); + } + + @Test + public void testKillMapTaskWhileFailFinishing() throws Exception { + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_FAILMSG)); + + assertEquals("Task attempt is not in FAILED state", taImpl.getState(), + TaskAttemptState.FAILED); + assertEquals("Task attempt's internal state is not " + + "FAIL_FINISHING_CONTAINER", taImpl.getInternalState(), + TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER); + + // If the map task is killed when it is in FAIL_FINISHING_CONTAINER state, + // the state will stay in FAIL_FINISHING_CONTAINER. + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_KILL)); + assertEquals("Task attempt is not in RUNNING state", taImpl.getState(), + TaskAttemptState.FAILED); + assertEquals("Task attempt's internal state is not " + + "FAIL_FINISHING_CONTAINER", taImpl.getInternalState(), + TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_TIMED_OUT)); + assertEquals("Task attempt's internal state is not FAIL_CONTAINER_CLEANUP", + taImpl.getInternalState(), + TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_CONTAINER_CLEANED)); + assertEquals("Task attempt's internal state is not FAIL_TASK_CLEANUP", + taImpl.getInternalState(), + TaskAttemptStateInternal.FAIL_TASK_CLEANUP); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_CLEANUP_DONE)); + + assertEquals("Task attempt is not in KILLED state", taImpl.getState(), + TaskAttemptState.FAILED); + + assertFalse("InternalError occurred", eventHandler.internalError); + } + + @Test + public void testFailMapTaskByClient() throws Exception { + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT)); + + assertEquals("Task attempt is not in RUNNING state", taImpl.getState(), + TaskAttemptState.FAILED); + assertEquals("Task attempt's internal state is not " + + "FAIL_CONTAINER_CLEANUP", taImpl.getInternalState(), + TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_CONTAINER_CLEANED)); + assertEquals("Task attempt's internal state is not FAIL_TASK_CLEANUP", + taImpl.getInternalState(), + TaskAttemptStateInternal.FAIL_TASK_CLEANUP); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_CLEANUP_DONE)); + + assertEquals("Task attempt is not in KILLED state", taImpl.getState(), + TaskAttemptState.FAILED); + + assertFalse("InternalError occurred", eventHandler.internalError); + } + + @Test + public void testTaskAttemptDiagnosticEventOnFinishing() throws Exception { + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_DONE)); + + assertEquals("Task attempt is not in RUNNING state", taImpl.getState(), + TaskAttemptState.SUCCEEDED); + assertEquals("Task attempt's internal state is not " + + "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(), + TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER); + + // TA_DIAGNOSTICS_UPDATE doesn't change state + taImpl.handle(new TaskAttemptDiagnosticsUpdateEvent(taImpl.getID(), + "Task got updated")); + assertEquals("Task attempt is not in RUNNING state", taImpl.getState(), + TaskAttemptState.SUCCEEDED); + assertEquals("Task attempt's internal state is not " + + "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(), + TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER); + + assertFalse("InternalError occurred", eventHandler.internalError); + } + + @Test + public void testTimeoutWhileSuccessFinishing() throws Exception { + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_DONE)); + + assertEquals("Task attempt is not in RUNNING state", taImpl.getState(), + TaskAttemptState.SUCCEEDED); + assertEquals("Task attempt's internal state is not " + + "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(), + TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER); + + // If the task stays in SUCCESS_FINISHING_CONTAINER for too long, + // TaskAttemptListenerImpl will time out the attempt. + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_TIMED_OUT)); + assertEquals("Task attempt is not in RUNNING state", taImpl.getState(), + TaskAttemptState.SUCCEEDED); + assertEquals("Task attempt's internal state is not " + + "SUCCESS_CONTAINER_CLEANUP", taImpl.getInternalState(), + TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP); + + assertFalse("InternalError occurred", eventHandler.internalError); + } + + @Test + public void testTimeoutWhileFailFinishing() throws Exception { + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_FAILMSG)); + + assertEquals("Task attempt is not in RUNNING state", taImpl.getState(), + TaskAttemptState.FAILED); + assertEquals("Task attempt's internal state is not " + + "FAIL_FINISHING_CONTAINER", taImpl.getInternalState(), + TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER); + + // If the task stays in FAIL_FINISHING_CONTAINER for too long, + // TaskAttemptListenerImpl will time out the attempt. + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_TIMED_OUT)); + assertEquals("Task attempt's internal state is not FAIL_CONTAINER_CLEANUP", + taImpl.getInternalState(), + TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP); + + assertFalse("InternalError occurred", eventHandler.internalError); + } + + private void setupTaskAttemptFinishingMonitor( + EventHandler eventHandler, JobConf jobConf, AppContext appCtx) { + TaskAttemptFinishingMonitor taskAttemptFinishingMonitor = + new TaskAttemptFinishingMonitor(eventHandler); + taskAttemptFinishingMonitor.init(jobConf); + when(appCtx.getTaskAttemptFinishingMonitor()). + thenReturn(taskAttemptFinishingMonitor); + } + + private TaskAttemptImpl createTaskAttemptImpl( + MockEventHandler eventHandler) { + ApplicationId appId = ApplicationId.newInstance(1, 2); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 0); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + Path jobFile = mock(Path.class); + + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0)); + + JobConf jobConf = new JobConf(); + jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); + jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); + + TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); + when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"}); + + AppContext appCtx = mock(AppContext.class); + ClusterInfo clusterInfo = mock(ClusterInfo.class); + when(appCtx.getClusterInfo()).thenReturn(clusterInfo); + setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx); + + TaskAttemptImpl taImpl = + new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, + splits, jobConf, taListener, + mock(Token.class), new Credentials(), + new SystemClock(), appCtx); + + NodeId nid = NodeId.newInstance("127.0.0.1", 0); + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_SCHEDULE)); + taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, + container, mock(Map.class))); + taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); + return taImpl; + } + public static class MockEventHandler implements EventHandler { public boolean internalError;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/444836b3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 0c2a2e9..49345cd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -235,7 +235,15 @@ public interface MRJobConfig { public static final String TASK_TIMEOUT = "mapreduce.task.timeout"; public static final String TASK_TIMEOUT_CHECK_INTERVAL_MS = "mapreduce.task.timeout.check-interval-ms"; - + + public static final String TASK_EXIT_TIMEOUT = "mapreduce.task.exit.timeout"; + + public static final int TASK_EXIT_TIMEOUT_DEFAULT = 60 * 1000; + + public static final String TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS = "mapreduce.task.exit.timeout.check-interval-ms"; + + public static final int TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS_DEFAULT = 20 * 1000; + public static final String TASK_ID = "mapreduce.task.id"; public static final String TASK_OUTPUT_DIR = "mapreduce.task.output.dir"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/444836b3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index d776d44..a5e76b3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1671,4 +1671,24 @@ app master. </description> </property> + +<property> + <name>mapreduce.task.exit.timeout</name> + <value>60000</value> + <description>The number of milliseconds before a task will be + terminated if it stays in finishing state for too long. + After a task attempt completes from TaskUmbilicalProtocol's point of view, + it will be transitioned to finishing state. That will give a chance for the + task to exit by itself. + </description> +</property> + +<property> + <name>mapreduce.task.exit.timeout.check-interval-ms</name> + <value>20000</value> + <description>The interval in milliseconds between which the MR framework + checks if task attempts stay in finishing state for too long. + </description> +</property> + </configuration> http://git-wip-us.apache.org/repos/asf/hadoop/blob/444836b3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java index 194b85a..41bc90a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java @@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.ClusterInfo; import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.TaskAttemptFinishingMonitor; import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo; import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; @@ -399,4 +400,9 @@ public class JobHistory extends AbstractService implements HistoryContext { // bogus - Not Required return null; } + + @Override + public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() { + return null; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/444836b3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java index d2edd19..5ce2761 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java @@ -102,7 +102,7 @@ public class TestSpeculativeExecutionWithMRApp { appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(), TaskAttemptEventType.TA_DONE)); appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(), - TaskAttemptEventType.TA_CONTAINER_CLEANED)); + TaskAttemptEventType.TA_CONTAINER_COMPLETED)); app.waitForState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED); } } @@ -170,7 +170,7 @@ public class TestSpeculativeExecutionWithMRApp { appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(), TaskAttemptEventType.TA_DONE)); appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(), - TaskAttemptEventType.TA_CONTAINER_CLEANED)); + TaskAttemptEventType.TA_CONTAINER_COMPLETED)); numTasksToFinish--; app.waitForState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED); } else { @@ -228,7 +228,7 @@ public class TestSpeculativeExecutionWithMRApp { appEventHandler.handle( new TaskAttemptEvent(ta[0].getID(), TaskAttemptEventType.TA_DONE)); appEventHandler.handle(new TaskAttemptEvent(ta[0].getID(), - TaskAttemptEventType.TA_CONTAINER_CLEANED)); + TaskAttemptEventType.TA_CONTAINER_COMPLETED)); return ta; }