Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1466770&r1=1466769&r2=1466770&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Thu Apr 11 05:02:45 2013 @@ -18,10 +18,21 @@ package org.apache.hadoop.mapreduce.v2.app; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; +import java.util.List; +import java.util.Map; import junit.framework.Assert; @@ -31,36 +42,66 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.JobCounter; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.jobhistory.Event; +import org.apache.hadoop.mapreduce.jobhistory.EventType; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; +import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent; +import org.apache.hadoop.mapreduce.v2.app.job.impl.MapTaskImpl; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; +import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.Clock; +import org.apache.hadoop.yarn.ClusterInfo; +import org.apache.hadoop.yarn.SystemClock; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.Test; +import org.mockito.ArgumentCaptor; @SuppressWarnings({"unchecked", "rawtypes"}) public class TestRecovery { @@ -75,6 +116,7 @@ public class TestRecovery { private Text val1 = new Text("val1"); private Text val2 = new Text("val2"); + /** * AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt * completely disappears because of failed launch, one attempt gets killed and @@ -1011,6 +1053,423 @@ public class TestRecovery { app.verifyCompleted(); } + @Test + public void testRecoverySuccessAttempt() { + LOG.info("--- START: testRecoverySuccessAttempt ---"); + + long clusterTimestamp = System.currentTimeMillis(); + EventHandler mockEventHandler = mock(EventHandler.class); + MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp, + mockEventHandler); + + TaskId taskId = recoverMapTask.getID(); + JobID jobID = new JobID(Long.toString(clusterTimestamp), 1); + TaskID taskID = new TaskID(jobID, + org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId()); + + //Mock up the TaskAttempts + Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts = + new HashMap<TaskAttemptID, TaskAttemptInfo>(); + + TaskAttemptID taId1 = new TaskAttemptID(taskID, 2); + TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1, + TaskAttemptState.SUCCEEDED); + mockTaskAttempts.put(taId1, mockTAinfo1); + + TaskAttemptID taId2 = new TaskAttemptID(taskID, 1); + TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2, + TaskAttemptState.FAILED); + mockTaskAttempts.put(taId2, mockTAinfo2); + + OutputCommitter mockCommitter = mock (OutputCommitter.class); + TaskInfo mockTaskInfo = mock(TaskInfo.class); + when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED"); + when(mockTaskInfo.getTaskId()).thenReturn(taskID); + when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts); + + recoverMapTask.handle( + new TaskRecoverEvent(taskId, mockTaskInfo,mockCommitter, true)); + + ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); + verify(mockEventHandler,atLeast(1)).handle( + (org.apache.hadoop.yarn.event.Event) arg.capture()); + + Map<TaskAttemptID, TaskAttemptState> finalAttemptStates = + new HashMap<TaskAttemptID, TaskAttemptState>(); + finalAttemptStates.put(taId1, TaskAttemptState.SUCCEEDED); + finalAttemptStates.put(taId2, TaskAttemptState.FAILED); + + List<EventType> jobHistoryEvents = new ArrayList<EventType>(); + jobHistoryEvents.add(EventType.TASK_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED); + jobHistoryEvents.add(EventType.TASK_FINISHED); + recoveryChecker(recoverMapTask, TaskState.SUCCEEDED, finalAttemptStates, + arg, jobHistoryEvents, 2L, 1L); + } + + @Test + public void testRecoveryAllFailAttempts() { + LOG.info("--- START: testRecoveryAllFailAttempts ---"); + + long clusterTimestamp = System.currentTimeMillis(); + EventHandler mockEventHandler = mock(EventHandler.class); + MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp, + mockEventHandler); + + TaskId taskId = recoverMapTask.getID(); + JobID jobID = new JobID(Long.toString(clusterTimestamp), 1); + TaskID taskID = new TaskID(jobID, + org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId()); + + //Mock up the TaskAttempts + Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts = + new HashMap<TaskAttemptID, TaskAttemptInfo>(); + + TaskAttemptID taId1 = new TaskAttemptID(taskID, 2); + TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1, + TaskAttemptState.FAILED); + mockTaskAttempts.put(taId1, mockTAinfo1); + + TaskAttemptID taId2 = new TaskAttemptID(taskID, 1); + TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2, + TaskAttemptState.FAILED); + mockTaskAttempts.put(taId2, mockTAinfo2); + + OutputCommitter mockCommitter = mock (OutputCommitter.class); + + TaskInfo mockTaskInfo = mock(TaskInfo.class); + when(mockTaskInfo.getTaskStatus()).thenReturn("FAILED"); + when(mockTaskInfo.getTaskId()).thenReturn(taskID); + when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts); + + recoverMapTask.handle( + new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true)); + + ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); + verify(mockEventHandler,atLeast(1)).handle( + (org.apache.hadoop.yarn.event.Event) arg.capture()); + + Map<TaskAttemptID, TaskAttemptState> finalAttemptStates = + new HashMap<TaskAttemptID, TaskAttemptState>(); + finalAttemptStates.put(taId1, TaskAttemptState.FAILED); + finalAttemptStates.put(taId2, TaskAttemptState.FAILED); + + List<EventType> jobHistoryEvents = new ArrayList<EventType>(); + jobHistoryEvents.add(EventType.TASK_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED); + jobHistoryEvents.add(EventType.TASK_FAILED); + recoveryChecker(recoverMapTask, TaskState.FAILED, finalAttemptStates, + arg, jobHistoryEvents, 2L, 2L); + } + + @Test + public void testRecoveryTaskSuccessAllAttemptsFail() { + LOG.info("--- START: testRecoveryTaskSuccessAllAttemptsFail ---"); + + long clusterTimestamp = System.currentTimeMillis(); + EventHandler mockEventHandler = mock(EventHandler.class); + MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp, + mockEventHandler); + + TaskId taskId = recoverMapTask.getID(); + JobID jobID = new JobID(Long.toString(clusterTimestamp), 1); + TaskID taskID = new TaskID(jobID, + org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId()); + + //Mock up the TaskAttempts + Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts = + new HashMap<TaskAttemptID, TaskAttemptInfo>(); + + TaskAttemptID taId1 = new TaskAttemptID(taskID, 2); + TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1, + TaskAttemptState.FAILED); + mockTaskAttempts.put(taId1, mockTAinfo1); + + TaskAttemptID taId2 = new TaskAttemptID(taskID, 1); + TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2, + TaskAttemptState.FAILED); + mockTaskAttempts.put(taId2, mockTAinfo2); + + OutputCommitter mockCommitter = mock (OutputCommitter.class); + TaskInfo mockTaskInfo = mock(TaskInfo.class); + when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED"); + when(mockTaskInfo.getTaskId()).thenReturn(taskID); + when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts); + + recoverMapTask.handle( + new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true)); + + ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); + verify(mockEventHandler,atLeast(1)).handle( + (org.apache.hadoop.yarn.event.Event) arg.capture()); + + Map<TaskAttemptID, TaskAttemptState> finalAttemptStates = + new HashMap<TaskAttemptID, TaskAttemptState>(); + finalAttemptStates.put(taId1, TaskAttemptState.FAILED); + finalAttemptStates.put(taId2, TaskAttemptState.FAILED); + // check for one new attempt launched since successful attempt not found + TaskAttemptID taId3 = new TaskAttemptID(taskID, 2000); + finalAttemptStates.put(taId3, TaskAttemptState.NEW); + + List<EventType> jobHistoryEvents = new ArrayList<EventType>(); + jobHistoryEvents.add(EventType.TASK_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED); + recoveryChecker(recoverMapTask, TaskState.RUNNING, finalAttemptStates, + arg, jobHistoryEvents, 2L, 2L); + } + + @Test + public void testRecoveryTaskSuccessAllAttemptsSucceed() { + LOG.info("--- START: testRecoveryTaskSuccessAllAttemptsFail ---"); + + long clusterTimestamp = System.currentTimeMillis(); + EventHandler mockEventHandler = mock(EventHandler.class); + MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp, + mockEventHandler); + + TaskId taskId = recoverMapTask.getID(); + JobID jobID = new JobID(Long.toString(clusterTimestamp), 1); + TaskID taskID = new TaskID(jobID, + org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId()); + + //Mock up the TaskAttempts + Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts = + new HashMap<TaskAttemptID, TaskAttemptInfo>(); + + TaskAttemptID taId1 = new TaskAttemptID(taskID, 2); + TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1, + TaskAttemptState.SUCCEEDED); + mockTaskAttempts.put(taId1, mockTAinfo1); + + TaskAttemptID taId2 = new TaskAttemptID(taskID, 1); + TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2, + TaskAttemptState.SUCCEEDED); + mockTaskAttempts.put(taId2, mockTAinfo2); + + OutputCommitter mockCommitter = mock (OutputCommitter.class); + TaskInfo mockTaskInfo = mock(TaskInfo.class); + when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED"); + when(mockTaskInfo.getTaskId()).thenReturn(taskID); + when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts); + + recoverMapTask.handle( + new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true)); + + ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); + verify(mockEventHandler,atLeast(1)).handle( + (org.apache.hadoop.yarn.event.Event) arg.capture()); + + Map<TaskAttemptID, TaskAttemptState> finalAttemptStates = + new HashMap<TaskAttemptID, TaskAttemptState>(); + finalAttemptStates.put(taId1, TaskAttemptState.SUCCEEDED); + finalAttemptStates.put(taId2, TaskAttemptState.SUCCEEDED); + + List<EventType> jobHistoryEvents = new ArrayList<EventType>(); + jobHistoryEvents.add(EventType.TASK_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED); + jobHistoryEvents.add(EventType.TASK_FINISHED); + recoveryChecker(recoverMapTask, TaskState.SUCCEEDED, finalAttemptStates, + arg, jobHistoryEvents, 2L, 0L); + } + + @Test + public void testRecoveryAllAttemptsKilled() { + LOG.info("--- START: testRecoveryAllAttemptsKilled ---"); + + long clusterTimestamp = System.currentTimeMillis(); + EventHandler mockEventHandler = mock(EventHandler.class); + MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp, + mockEventHandler); + + TaskId taskId = recoverMapTask.getID(); + JobID jobID = new JobID(Long.toString(clusterTimestamp), 1); + TaskID taskID = new TaskID(jobID, + org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId()); + + //Mock up the TaskAttempts + Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts = + new HashMap<TaskAttemptID, TaskAttemptInfo>(); + TaskAttemptID taId1 = new TaskAttemptID(taskID, 2); + TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1, + TaskAttemptState.KILLED); + mockTaskAttempts.put(taId1, mockTAinfo1); + + TaskAttemptID taId2 = new TaskAttemptID(taskID, 1); + TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2, + TaskAttemptState.KILLED); + mockTaskAttempts.put(taId2, mockTAinfo2); + + OutputCommitter mockCommitter = mock (OutputCommitter.class); + TaskInfo mockTaskInfo = mock(TaskInfo.class); + when(mockTaskInfo.getTaskStatus()).thenReturn("KILLED"); + when(mockTaskInfo.getTaskId()).thenReturn(taskID); + when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts); + + recoverMapTask.handle( + new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true)); + + ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); + verify(mockEventHandler,atLeast(1)).handle( + (org.apache.hadoop.yarn.event.Event) arg.capture()); + + Map<TaskAttemptID, TaskAttemptState> finalAttemptStates = + new HashMap<TaskAttemptID, TaskAttemptState>(); + finalAttemptStates.put(taId1, TaskAttemptState.KILLED); + finalAttemptStates.put(taId2, TaskAttemptState.KILLED); + + List<EventType> jobHistoryEvents = new ArrayList<EventType>(); + jobHistoryEvents.add(EventType.TASK_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_KILLED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_KILLED); + jobHistoryEvents.add(EventType.TASK_FAILED); + recoveryChecker(recoverMapTask, TaskState.KILLED, finalAttemptStates, + arg, jobHistoryEvents, 2L, 0L); + } + + private void recoveryChecker(MapTaskImpl checkTask, TaskState finalState, + Map<TaskAttemptID, TaskAttemptState> finalAttemptStates, + ArgumentCaptor<Event> arg, List<EventType> expectedJobHistoryEvents, + long expectedMapLaunches, long expectedFailedMaps) { + + assertEquals("Final State of Task", finalState, checkTask.getState()); + + Map<TaskAttemptId, TaskAttempt> recoveredAttempts = + checkTask.getAttempts(); + assertEquals("Expected Number of Task Attempts", + finalAttemptStates.size(), recoveredAttempts.size()); + for (TaskAttemptID taID : finalAttemptStates.keySet()) { + assertEquals("Expected Task Attempt State", + finalAttemptStates.get(taID), + recoveredAttempts.get(TypeConverter.toYarn(taID)).getState()); + } + + Iterator<Event> ie = arg.getAllValues().iterator(); + int eventNum = 0; + long totalLaunchedMaps = 0; + long totalFailedMaps = 0; + boolean jobTaskEventReceived = false; + + while (ie.hasNext()) { + Object current = ie.next(); + ++eventNum; + LOG.info(eventNum + " " + current.getClass().getName()); + if (current instanceof JobHistoryEvent) { + JobHistoryEvent jhe = (JobHistoryEvent) current; + LOG.info(expectedJobHistoryEvents.get(0).toString() + " " + + jhe.getHistoryEvent().getEventType().toString() + " " + + jhe.getJobID()); + assertEquals(expectedJobHistoryEvents.get(0), + jhe.getHistoryEvent().getEventType()); + expectedJobHistoryEvents.remove(0); + } else if (current instanceof JobCounterUpdateEvent) { + JobCounterUpdateEvent jcue = (JobCounterUpdateEvent) current; + + LOG.info("JobCounterUpdateEvent " + + jcue.getCounterUpdates().get(0).getCounterKey() + + " " + jcue.getCounterUpdates().get(0).getIncrementValue()); + if (jcue.getCounterUpdates().get(0).getCounterKey() == + JobCounter.NUM_FAILED_MAPS) { + totalFailedMaps += jcue.getCounterUpdates().get(0) + .getIncrementValue(); + } else if (jcue.getCounterUpdates().get(0).getCounterKey() == + JobCounter.TOTAL_LAUNCHED_MAPS) { + totalLaunchedMaps += jcue.getCounterUpdates().get(0) + .getIncrementValue(); + } + } else if (current instanceof JobTaskEvent) { + JobTaskEvent jte = (JobTaskEvent) current; + assertEquals(jte.getState(), finalState); + jobTaskEventReceived = true; + } + } + assertTrue(jobTaskEventReceived || (finalState == TaskState.RUNNING)); + assertEquals("Did not process all expected JobHistoryEvents", + 0, expectedJobHistoryEvents.size()); + assertEquals("Expected Map Launches", + expectedMapLaunches, totalLaunchedMaps); + assertEquals("Expected Failed Maps", + expectedFailedMaps, totalFailedMaps); + } + + private MapTaskImpl getMockMapTask(long clusterTimestamp, EventHandler eh) { + + ApplicationId appId = BuilderUtils.newApplicationId(clusterTimestamp, 1); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + + int partitions = 2; + + Path remoteJobConfFile = mock(Path.class); + JobConf conf = new JobConf(); + TaskAttemptListener taskAttemptListener = mock(TaskAttemptListener.class); + Token<JobTokenIdentifier> jobToken = + (Token<JobTokenIdentifier>) mock(Token.class); + Credentials credentials = null; + Clock clock = new SystemClock(); + int appAttemptId = 3; + MRAppMetrics metrics = mock(MRAppMetrics.class); + Resource minContainerRequirements = mock(Resource.class); + when(minContainerRequirements.getMemory()).thenReturn(1000); + + ClusterInfo clusterInfo = mock(ClusterInfo.class); + when(clusterInfo.getMinContainerCapability()).thenReturn( + minContainerRequirements); + AppContext appContext = mock(AppContext.class); + when(appContext.getClusterInfo()).thenReturn(clusterInfo); + + TaskSplitMetaInfo taskSplitMetaInfo = mock(TaskSplitMetaInfo.class); + MapTaskImpl mapTask = new MapTaskImpl(jobId, partitions, + eh, remoteJobConfFile, conf, + taskSplitMetaInfo, taskAttemptListener, jobToken, credentials, clock, + appAttemptId, metrics, appContext); + return mapTask; + } + + private TaskAttemptInfo getMockTaskAttemptInfo(TaskAttemptID tai, + TaskAttemptState tas) { + + ContainerId ci = mock(ContainerId.class); + Counters counters = mock(Counters.class); + TaskType tt = TaskType.MAP; + + long finishTime = System.currentTimeMillis(); + + TaskAttemptInfo mockTAinfo = mock(TaskAttemptInfo.class); + + when(mockTAinfo.getAttemptId()).thenReturn(tai); + when(mockTAinfo.getContainerId()).thenReturn(ci); + when(mockTAinfo.getCounters()).thenReturn(counters); + when(mockTAinfo.getError()).thenReturn(""); + when(mockTAinfo.getFinishTime()).thenReturn(finishTime); + when(mockTAinfo.getHostname()).thenReturn("localhost"); + when(mockTAinfo.getHttpPort()).thenReturn(23); + when(mockTAinfo.getMapFinishTime()).thenReturn(finishTime - 1000L); + when(mockTAinfo.getPort()).thenReturn(24); + when(mockTAinfo.getRackname()).thenReturn("defaultRack"); + when(mockTAinfo.getShuffleFinishTime()).thenReturn(finishTime - 2000L); + when(mockTAinfo.getShufflePort()).thenReturn(25); + when(mockTAinfo.getSortFinishTime()).thenReturn(finishTime - 3000L); + when(mockTAinfo.getStartTime()).thenReturn(finishTime -10000); + when(mockTAinfo.getState()).thenReturn("task in progress"); + when(mockTAinfo.getTaskStatus()).thenReturn(tas.toString()); + when(mockTAinfo.getTaskType()).thenReturn(tt); + when(mockTAinfo.getTrackerName()).thenReturn("TrackerName"); + return mockTAinfo; + } + private void writeBadOutput(TaskAttempt attempt, Configuration conf) throws Exception { TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, @@ -1145,5 +1604,16 @@ public class TestRecovery { public static void main(String[] arg) throws Exception { TestRecovery test = new TestRecovery(); test.testCrashed(); + test.testMultipleCrashes(); + test.testOutputRecovery(); + test.testOutputRecoveryMapsOnly(); + test.testRecoveryWithOldCommiter(); + test.testSpeculative(); + test.testRecoveryWithoutShuffleSecret(); + test.testRecoverySuccessAttempt(); + test.testRecoveryAllFailAttempts(); + test.testRecoveryTaskSuccessAllAttemptsFail(); + test.testRecoveryTaskSuccessAllAttemptsSucceed(); + test.testRecoveryAllAttemptsKilled(); } }
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1466770&r1=1466769&r2=1466770&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Thu Apr 11 05:02:45 2013 @@ -316,7 +316,8 @@ import org.junit.Test; Job newJob = new TestJob(getJobId(), getAttemptID(), conf, getDispatcher().getEventHandler(), getTaskAttemptListener(), getContext().getClock(), - isNewApiCommitter(), currentUser.getUserName(), getContext(), + getCommitter(), isNewApiCommitter(), + currentUser.getUserName(), getContext(), forcedState, diagnostic); ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob); Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1466770&r1=1466769&r2=1466770&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Thu Apr 11 05:02:45 2013 @@ -25,6 +25,7 @@ import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; +import java.util.Collections; import java.util.EnumSet; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; @@ -35,6 +36,7 @@ import org.apache.hadoop.mapreduce.JobAC import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.jobhistory.EventType; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobStatus.State; @@ -47,6 +49,7 @@ import org.apache.hadoop.mapreduce.secur import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; @@ -57,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition; @@ -69,7 +73,6 @@ import org.apache.hadoop.yarn.SystemCloc import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; @@ -133,7 +136,7 @@ public class TestJobImpl { JobImpl job = createStubbedJob(conf, dispatcher, 0); job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); - job.handle(new JobEvent(job.getID(), JobEventType.JOB_START)); + job.handle(new JobStartEvent(job.getID())); assertJobState(job, JobStateInternal.SUCCEEDED); dispatcher.stop(); commitHandler.stop(); @@ -222,7 +225,7 @@ public class TestJobImpl { JobId jobId = job.getID(); job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); - job.handle(new JobEvent(jobId, JobEventType.JOB_START)); + job.handle(new JobStartEvent(jobId)); assertJobState(job, JobStateInternal.SETUP); job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT)); @@ -284,7 +287,7 @@ public class TestJobImpl { JobId jobId = job.getID(); job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); - job.handle(new JobEvent(jobId, JobEventType.JOB_START)); + job.handle(new JobStartEvent(jobId)); assertJobState(job, JobStateInternal.SETUP); job.handle(new JobEvent(job.getID(), JobEventType.JOB_KILL)); @@ -351,7 +354,7 @@ public class TestJobImpl { JobId jobId = job.getID(); job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); - job.handle(new JobEvent(jobId, JobEventType.JOB_START)); + job.handle(new JobStartEvent(jobId)); assertJobState(job, JobStateInternal.FAIL_ABORT); job.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); @@ -388,7 +391,7 @@ public class TestJobImpl { JobId jobId = job.getID(); job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); - job.handle(new JobEvent(jobId, JobEventType.JOB_START)); + job.handle(new JobStartEvent(jobId)); assertJobState(job, JobStateInternal.SETUP); job.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); @@ -428,7 +431,7 @@ public class TestJobImpl { // Verify access JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null, - null, null, true, null, 0, null, null, null, null); + null, null, null, true, null, 0, null, null, null, null); Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB)); Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB)); @@ -439,7 +442,7 @@ public class TestJobImpl { // Verify access JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null, - null, null, true, null, 0, null, null, null, null); + null, null, null, true, null, 0, null, null, null, null); Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB)); Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB)); @@ -450,7 +453,7 @@ public class TestJobImpl { // Verify access JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null, - null, null, true, null, 0, null, null, null, null); + null, null, null, true, null, 0, null, null, null, null); Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB)); Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB)); @@ -461,7 +464,7 @@ public class TestJobImpl { // Verify access JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null, - null, null, true, null, 0, null, null, null, null); + null, null, null, true, null, 0, null, null, null, null); Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB)); Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB)); @@ -472,7 +475,7 @@ public class TestJobImpl { // Verify access JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null, - null, null, true, null, 0, null, null, null, null); + null, null, null, true, null, 0, null, null, null, null); Assert.assertTrue(job5.checkAccess(ugi1, null)); Assert.assertTrue(job5.checkAccess(ugi2, null)); } @@ -490,7 +493,7 @@ public class TestJobImpl { mock(EventHandler.class), null, mock(JobTokenSecretManager.class), null, new SystemClock(), null, - mrAppMetrics, true, null, 0, null, null, null, null); + mrAppMetrics, null, true, null, 0, null, null, null, null); job.handle(diagUpdateEvent); String diagnostics = job.getReport().getDiagnostics(); Assert.assertNotNull(diagnostics); @@ -501,7 +504,7 @@ public class TestJobImpl { mock(EventHandler.class), null, mock(JobTokenSecretManager.class), null, new SystemClock(), null, - mrAppMetrics, true, null, 0, null, null, null, null); + mrAppMetrics, null, true, null, 0, null, null, null, null); job.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); job.handle(diagUpdateEvent); diagnostics = job.getReport().getDiagnostics(); @@ -556,7 +559,7 @@ public class TestJobImpl { JobImpl job = new JobImpl(jobId, Records .newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class), null, new JobTokenSecretManager(), new Credentials(), null, null, - mrAppMetrics, true, null, 0, null, null, null, null); + mrAppMetrics, null, true, null, 0, null, null, null, null); InitTransition initTransition = getInitTransition(2); JobEvent mockJobEvent = mock(JobEvent.class); initTransition.transition(job, mockJobEvent); @@ -597,7 +600,7 @@ public class TestJobImpl { JobId jobId = job.getID(); job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); - job.handle(new JobEvent(jobId, JobEventType.JOB_START)); + job.handle(new JobStartEvent(jobId)); assertJobState(job, JobStateInternal.FAILED); job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED)); @@ -661,7 +664,7 @@ public class TestJobImpl { StubbedJob job = createStubbedJob(conf, dispatcher, numSplits); job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); - job.handle(new JobEvent(job.getID(), JobEventType.JOB_START)); + job.handle(new JobStartEvent(job.getID())); assertJobState(job, JobStateInternal.RUNNING); return job; } @@ -785,9 +788,9 @@ public class TestJobImpl { boolean newApiCommitter, String user, int numSplits) { super(jobId, applicationAttemptId, conf, eventHandler, null, new JobTokenSecretManager(), new Credentials(), - new SystemClock(), null, MRAppMetrics.create(), - newApiCommitter, user, System.currentTimeMillis(), null, null, null, - null); + new SystemClock(), Collections.<TaskId, TaskInfo> emptyMap(), + MRAppMetrics.create(), null, newApiCommitter, user, + System.currentTimeMillis(), null, null, null, null); initTransition = getInitTransition(numSplits); localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW, Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1466770&r1=1466769&r2=1466770&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Thu Apr 11 05:02:45 2013 @@ -27,7 +27,6 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -38,7 +37,6 @@ import org.apache.hadoop.mapred.TaskUmbi import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskCounter; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.v2.api.records.Avataar; @@ -80,7 +78,6 @@ public class TestTaskImpl { private Path remoteJobConfFile; private Credentials credentials; private Clock clock; - private Map<TaskId, TaskInfo> completedTasksFromPreviousRun; private MRAppMetrics metrics; private TaskImpl mockTask; private ApplicationId appId; @@ -104,13 +101,12 @@ public class TestTaskImpl { EventHandler eventHandler, Path remoteJobConfFile, JobConf conf, TaskAttemptListener taskAttemptListener, Token<JobTokenIdentifier> jobToken, - Credentials credentials, Clock clock, - Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount, + Credentials credentials, Clock clock, int startCount, MRAppMetrics metrics, AppContext appContext, TaskType taskType) { super(jobId, taskType , partition, eventHandler, remoteJobConfFile, conf, taskAttemptListener, jobToken, credentials, clock, - completedTasksFromPreviousRun, startCount, metrics, appContext); + startCount, metrics, appContext); this.taskType = taskType; } @@ -247,8 +243,7 @@ public class TestTaskImpl { return new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(), remoteJobConfFile, conf, taskAttemptListener, jobToken, credentials, clock, - completedTasksFromPreviousRun, startCount, - metrics, appContext, taskType); + startCount, metrics, appContext, taskType); } @After @@ -652,9 +647,7 @@ public class TestTaskImpl { public void testFailedTransitions() { mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(), remoteJobConfFile, conf, taskAttemptListener, jobToken, - credentials, clock, - completedTasksFromPreviousRun, startCount, - metrics, appContext, TaskType.MAP) { + credentials, clock, startCount, metrics, appContext, TaskType.MAP) { @Override protected int getMaxAttempts() { return 1; @@ -721,9 +714,7 @@ public class TestTaskImpl { public void testCountersWithSpeculation() { mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(), remoteJobConfFile, conf, taskAttemptListener, jobToken, - credentials, clock, - completedTasksFromPreviousRun, startCount, - metrics, appContext, TaskType.MAP) { + credentials, clock, startCount, metrics, appContext, TaskType.MAP) { @Override protected int getMaxAttempts() { return 1; Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java?rev=1466770&r1=1466769&r2=1466770&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java Thu Apr 11 05:02:45 2013 @@ -47,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import com.google.common.base.Joiner; @@ -525,4 +526,19 @@ public class JobHistoryUtils { sb.append(jobId.toString()); return sb.toString(); } + + public static Path getPreviousJobHistoryPath( + Configuration conf, ApplicationAttemptId applicationAttemptId) + throws IOException { + String jobId = + TypeConverter.fromYarn(applicationAttemptId.getApplicationId()) + .toString(); + String jobhistoryDir = + JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId); + Path histDirPath = FileContext.getFileContext(conf).makeQualified( + new Path(jobhistoryDir)); + FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf); + return fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile( + histDirPath,jobId, (applicationAttemptId.getAttemptId() - 1))); + } } Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1466770&r1=1466769&r2=1466770&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Thu Apr 11 05:02:45 2013 @@ -422,6 +422,7 @@ public interface MRJobConfig { /** Enable job recovery.*/ public static final String MR_AM_JOB_RECOVERY_ENABLE = MR_AM_PREFIX + "job.recovery.enable"; + public static final boolean MR_AM_JOB_RECOVERY_ENABLE_DEFAULT = true; /** * Limit on the number of reducers that can be preempted to ensure that at