Author: szetszwo Date: Sat Nov 10 00:49:15 2012 New Revision: 1407706 URL: http://svn.apache.org/viewvc?rev=1407706&view=rev Log: Merge r1406415 through r1407703 from trunk.
Added: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/ - copied from r1407703, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/ - copied from r1407703, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/EncryptedShuffle.apt.vm - copied unchanged from r1407703, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/EncryptedShuffle.apt.vm Removed: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/build-utils.xml hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/build.xml hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/ivy/ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/ivy.xml hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/ Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (contents, props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (contents, props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1406415-1407703 Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt?rev=1407706&r1=1407705&r2=1407706&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt Sat Nov 10 00:49:15 2012 @@ -194,6 +194,9 @@ Release 2.0.3-alpha - Unreleased MAPREDUCE-1806. CombineFileInputFormat does not work with paths not on default FS. (Gera Shegalov via tucu) + MAPREDUCE-4777. In TestIFile, testIFileReaderWithCodec relies on + testIFileWriterWithCodec. (Sandy Ryza via tomwhite) + Release 2.0.2-alpha - 2012-09-07 INCOMPATIBLE CHANGES @@ -584,6 +587,10 @@ Release 0.23.5 - UNRELEASED MAPREDUCE-4752. Reduce MR AM memory usage through String Interning (Robert Evans via tgraves) + MAPREDUCE-4266. remove Ant remnants from MR (tgraves via bobby) + + MAPREDUCE-4666. JVM metrics for history server (jlowe via jeagles) + OPTIMIZATIONS BUG FIXES @@ -634,6 +641,15 @@ Release 0.23.5 - UNRELEASED MAPREDUCE-4771. KeyFieldBasedPartitioner not partitioning properly when configured (jlowe via bobby) + + MAPREDUCE-4772. Fetch failures can take way too long for a map to be + restarted (bobby) + + MAPREDUCE-4782. NLineInputFormat skips first line of last InputSplit + (Mark Fuhs via bobby) + + MAPREDUCE-4774. JobImpl does not handle asynchronous task events in FAILED + state (jlowe via bobby) Release 0.23.4 - UNRELEASED Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1406415-1407703 Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1406415-1407703 Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1407706&r1=1407705&r2=1407706&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Sat Nov 10 00:49:15 2012 @@ -68,6 +68,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.api.records.Phase; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; @@ -347,6 +348,9 @@ public class JobImpl implements org.apac .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED, EnumSet.of(JobEventType.JOB_KILL, JobEventType.JOB_UPDATED_NODES, + JobEventType.JOB_TASK_COMPLETED, + JobEventType.JOB_TASK_ATTEMPT_COMPLETED, + JobEventType.JOB_MAP_TASK_RESCHEDULED, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)) // Transitions from KILLED state @@ -1409,16 +1413,22 @@ public class JobImpl implements org.apac fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1); job.fetchFailuresMapping.put(mapId, fetchFailures); - //get number of running reduces - int runningReduceTasks = 0; + //get number of shuffling reduces + int shufflingReduceTasks = 0; for (TaskId taskId : job.reduceTasks) { - if (TaskState.RUNNING.equals(job.tasks.get(taskId).getState())) { - runningReduceTasks++; + Task task = job.tasks.get(taskId); + if (TaskState.RUNNING.equals(task.getState())) { + for(TaskAttempt attempt : task.getAttempts().values()) { + if(attempt.getReport().getPhase() == Phase.SHUFFLE) { + shufflingReduceTasks++; + break; + } + } } } - float failureRate = runningReduceTasks == 0 ? 1.0f : - (float) fetchFailures / runningReduceTasks; + float failureRate = shufflingReduceTasks == 0 ? 1.0f : + (float) fetchFailures / shufflingReduceTasks; // declare faulty if fetch-failures >= max-allowed-failures boolean isMapFaulty = (failureRate >= MAX_ALLOWED_FETCH_FAILURES_FRACTION); Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java?rev=1407706&r1=1407705&r2=1407706&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java Sat Nov 10 00:49:15 2012 @@ -18,14 +18,19 @@ package org.apache.hadoop.mapreduce.v2.app; +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.api.records.Phase; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; @@ -37,6 +42,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; import org.apache.hadoop.yarn.event.EventHandler; import org.junit.Assert; import org.junit.Test; @@ -254,6 +260,169 @@ public class TestFetchFailure { events = job.getTaskAttemptCompletionEvents(0, 100); Assert.assertEquals("Num completion events not correct", 2, events.length); } + + @Test + public void testFetchFailureMultipleReduces() throws Exception { + MRApp app = new MRApp(1, 3, false, this.getClass().getName(), true); + Configuration conf = new Configuration(); + // map -> reduce -> fetch-failure -> map retry is incompatible with + // sequential, single-task-attempt approach in uber-AM, so disable: + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + //all maps would be running + Assert.assertEquals("Num tasks not correct", + 4, job.getTasks().size()); + Iterator<Task> it = job.getTasks().values().iterator(); + Task mapTask = it.next(); + Task reduceTask = it.next(); + Task reduceTask2 = it.next(); + Task reduceTask3 = it.next(); + + //wait for Task state move to RUNNING + app.waitForState(mapTask, TaskState.RUNNING); + TaskAttempt mapAttempt1 = mapTask.getAttempts().values().iterator().next(); + app.waitForState(mapAttempt1, TaskAttemptState.RUNNING); + + //send the done signal to the map attempt + app.getContext().getEventHandler().handle( + new TaskAttemptEvent(mapAttempt1.getID(), + TaskAttemptEventType.TA_DONE)); + + // wait for map success + app.waitForState(mapTask, TaskState.SUCCEEDED); + + TaskAttemptCompletionEvent[] events = + job.getTaskAttemptCompletionEvents(0, 100); + Assert.assertEquals("Num completion events not correct", + 1, events.length); + Assert.assertEquals("Event status not correct", + TaskAttemptCompletionEventStatus.SUCCEEDED, events[0].getStatus()); + + // wait for reduce to start running + app.waitForState(reduceTask, TaskState.RUNNING); + app.waitForState(reduceTask2, TaskState.RUNNING); + app.waitForState(reduceTask3, TaskState.RUNNING); + TaskAttempt reduceAttempt = + reduceTask.getAttempts().values().iterator().next(); + app.waitForState(reduceAttempt, TaskAttemptState.RUNNING); + + updateStatus(app, reduceAttempt, Phase.SHUFFLE); + + TaskAttempt reduceAttempt2 = + reduceTask2.getAttempts().values().iterator().next(); + app.waitForState(reduceAttempt2, TaskAttemptState.RUNNING); + updateStatus(app, reduceAttempt2, Phase.SHUFFLE); + + TaskAttempt reduceAttempt3 = + reduceTask3.getAttempts().values().iterator().next(); + app.waitForState(reduceAttempt3, TaskAttemptState.RUNNING); + updateStatus(app, reduceAttempt3, Phase.SHUFFLE); + + //send 3 fetch failures from reduce to trigger map re execution + sendFetchFailure(app, reduceAttempt, mapAttempt1); + sendFetchFailure(app, reduceAttempt, mapAttempt1); + sendFetchFailure(app, reduceAttempt, mapAttempt1); + + //We should not re-launch the map task yet + assertEquals(TaskState.SUCCEEDED, mapTask.getState()); + updateStatus(app, reduceAttempt2, Phase.REDUCE); + updateStatus(app, reduceAttempt3, Phase.REDUCE); + + sendFetchFailure(app, reduceAttempt, mapAttempt1); + + //wait for map Task state move back to RUNNING + app.waitForState(mapTask, TaskState.RUNNING); + + //map attempt must have become FAILED + Assert.assertEquals("Map TaskAttempt state not correct", + TaskAttemptState.FAILED, mapAttempt1.getState()); + + Assert.assertEquals("Num attempts in Map Task not correct", + 2, mapTask.getAttempts().size()); + + Iterator<TaskAttempt> atIt = mapTask.getAttempts().values().iterator(); + atIt.next(); + TaskAttempt mapAttempt2 = atIt.next(); + + app.waitForState(mapAttempt2, TaskAttemptState.RUNNING); + //send the done signal to the second map attempt + app.getContext().getEventHandler().handle( + new TaskAttemptEvent(mapAttempt2.getID(), + TaskAttemptEventType.TA_DONE)); + + // wait for map success + app.waitForState(mapTask, TaskState.SUCCEEDED); + + //send done to reduce + app.getContext().getEventHandler().handle( + new TaskAttemptEvent(reduceAttempt.getID(), + TaskAttemptEventType.TA_DONE)); + + //send done to reduce + app.getContext().getEventHandler().handle( + new TaskAttemptEvent(reduceAttempt2.getID(), + TaskAttemptEventType.TA_DONE)); + + //send done to reduce + app.getContext().getEventHandler().handle( + new TaskAttemptEvent(reduceAttempt3.getID(), + TaskAttemptEventType.TA_DONE)); + + app.waitForState(job, JobState.SUCCEEDED); + + //previous completion event now becomes obsolete + Assert.assertEquals("Event status not correct", + TaskAttemptCompletionEventStatus.OBSOLETE, events[0].getStatus()); + + events = job.getTaskAttemptCompletionEvents(0, 100); + Assert.assertEquals("Num completion events not correct", + 6, events.length); + Assert.assertEquals("Event map attempt id not correct", + mapAttempt1.getID(), events[0].getAttemptId()); + Assert.assertEquals("Event map attempt id not correct", + mapAttempt1.getID(), events[1].getAttemptId()); + Assert.assertEquals("Event map attempt id not correct", + mapAttempt2.getID(), events[2].getAttemptId()); + Assert.assertEquals("Event reduce attempt id not correct", + reduceAttempt.getID(), events[3].getAttemptId()); + Assert.assertEquals("Event status not correct for map attempt1", + TaskAttemptCompletionEventStatus.OBSOLETE, events[0].getStatus()); + Assert.assertEquals("Event status not correct for map attempt1", + TaskAttemptCompletionEventStatus.FAILED, events[1].getStatus()); + Assert.assertEquals("Event status not correct for map attempt2", + TaskAttemptCompletionEventStatus.SUCCEEDED, events[2].getStatus()); + Assert.assertEquals("Event status not correct for reduce attempt1", + TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus()); + + TaskAttemptCompletionEvent mapEvents[] = + job.getMapAttemptCompletionEvents(0, 2); + Assert.assertEquals("Incorrect number of map events", 2, mapEvents.length); + Assert.assertArrayEquals("Unexpected map events", + Arrays.copyOfRange(events, 0, 2), mapEvents); + mapEvents = job.getMapAttemptCompletionEvents(2, 200); + Assert.assertEquals("Incorrect number of map events", 1, mapEvents.length); + Assert.assertEquals("Unexpected map event", events[2], mapEvents[0]); + } + + + private void updateStatus(MRApp app, TaskAttempt attempt, Phase phase) { + TaskAttemptStatusUpdateEvent.TaskAttemptStatus status = new TaskAttemptStatusUpdateEvent.TaskAttemptStatus(); + status.counters = new Counters(); + status.fetchFailedMaps = new ArrayList<TaskAttemptId>(); + status.id = attempt.getID(); + status.mapFinishTime = 0; + status.outputSize = 0; + status.phase = phase; + status.progress = 0.5f; + status.shuffleFinishTime = 0; + status.sortFinishTime = 0; + status.stateString = "OK"; + status.taskState = attempt.getState(); + TaskAttemptStatusUpdateEvent event = new TaskAttemptStatusUpdateEvent(attempt.getID(), + status); + app.getContext().getEventHandler().handle(event); + } private void sendFetchFailure(MRApp app, TaskAttempt reduceAttempt, TaskAttempt mapAttempt) { Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1407706&r1=1407705&r2=1407706&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Sat Nov 10 00:49:15 2012 @@ -27,6 +27,7 @@ import static org.mockito.Mockito.verify import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.EnumSet; import java.util.HashMap; import java.util.Map; @@ -42,6 +43,7 @@ import org.apache.hadoop.mapreduce.jobhi import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.Task; @@ -51,10 +53,14 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.SystemClock; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher; +import org.apache.hadoop.yarn.state.StateMachine; +import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -340,7 +346,7 @@ public class TestJobImpl { return isUber; } - private InitTransition getInitTransition() { + private static InitTransition getInitTransition() { InitTransition initTransition = new InitTransition() { @Override protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) { @@ -350,4 +356,63 @@ public class TestJobImpl { }; return initTransition; } + + @Test + public void testTransitionsAtFailed() throws IOException { + Configuration conf = new Configuration(); + JobID jobID = JobID.forName("job_1234567890000_0001"); + JobId jobId = TypeConverter.toYarn(jobID); + OutputCommitter committer = mock(OutputCommitter.class); + doThrow(new IOException("forcefail")) + .when(committer).setupJob(any(JobContext.class)); + InlineDispatcher dispatcher = new InlineDispatcher(); + JobImpl job = new StubbedJob(jobId, Records + .newRecord(ApplicationAttemptId.class), conf, + dispatcher.getEventHandler(), committer, true, null); + + dispatcher.register(JobEventType.class, job); + job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); + Assert.assertEquals(JobState.FAILED, job.getState()); + + job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED)); + Assert.assertEquals(JobState.FAILED, job.getState()); + job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_COMPLETED)); + Assert.assertEquals(JobState.FAILED, job.getState()); + job.handle(new JobEvent(jobId, JobEventType.JOB_MAP_TASK_RESCHEDULED)); + Assert.assertEquals(JobState.FAILED, job.getState()); + job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)); + Assert.assertEquals(JobState.FAILED, job.getState()); + } + + private static class StubbedJob extends JobImpl { + //override the init transition + private final InitTransition initTransition = getInitTransition(); + StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> localFactory + = stateMachineFactory.addTransition(JobStateInternal.NEW, + EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED), + JobEventType.JOB_INIT, + // This is abusive. + initTransition); + + private final StateMachine<JobStateInternal, JobEventType, JobEvent> + localStateMachine; + + @Override + protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() { + return localStateMachine; + } + + public StubbedJob(JobId jobId, ApplicationAttemptId applicationAttemptId, + Configuration conf, EventHandler eventHandler, + OutputCommitter committer, boolean newApiCommitter, String user) { + super(jobId, applicationAttemptId, conf, eventHandler, + null, new JobTokenSecretManager(), new Credentials(), + new SystemClock(), null, MRAppMetrics.create(), committer, + newApiCommitter, user, System.currentTimeMillis(), null, null); + + // This "this leak" is okay because the retained pointer is in an + // instance variable. + localStateMachine = localFactory.make(this); + } + } } Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1407706&r1=1407705&r2=1407706&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Sat Nov 10 00:49:15 2012 @@ -262,6 +262,9 @@ public interface MRJobConfig { public static final String SHUFFLE_FETCH_FAILURES = "mapreduce.reduce.shuffle.maxfetchfailures"; public static final String SHUFFLE_NOTIFY_READERROR = "mapreduce.reduce.shuffle.notify.readerror"; + + public static final String MAX_SHUFFLE_FETCH_RETRY_DELAY = "mapreduce.reduce.shuffle.retry-delay.max.ms"; + public static final long DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY = 60000; public static final String REDUCE_SKIP_INCR_PROC_COUNT = "mapreduce.reduce.skip.proc-count.auto-incr"; Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java?rev=1407706&r1=1407705&r2=1407706&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java Sat Nov 10 00:49:15 2012 @@ -107,25 +107,14 @@ public class NLineInputFormat extends Fi numLines++; length += num; if (numLines == numLinesPerSplit) { - // NLineInputFormat uses LineRecordReader, which always reads - // (and consumes) at least one character out of its upper split - // boundary. So to make sure that each mapper gets N lines, we - // move back the upper split limits of each split - // by one character here. - if (begin == 0) { - splits.add(new FileSplit(fileName, begin, length - 1, - new String[] {})); - } else { - splits.add(new FileSplit(fileName, begin - 1, length, - new String[] {})); - } + splits.add(createFileSplit(fileName, begin, length)); begin += length; length = 0; numLines = 0; } } if (numLines != 0) { - splits.add(new FileSplit(fileName, begin, length, new String[]{})); + splits.add(createFileSplit(fileName, begin, length)); } } finally { if (lr != null) { @@ -134,6 +123,23 @@ public class NLineInputFormat extends Fi } return splits; } + + /** + * NLineInputFormat uses LineRecordReader, which always reads + * (and consumes) at least one character out of its upper split + * boundary. So to make sure that each mapper gets N lines, we + * move back the upper split limits of each split + * by one character here. + * @param fileName Path of file + * @param begin the position of the first byte in the file to process + * @param length number of bytes in InputSplit + * @return FileSplit + */ + protected static FileSplit createFileSplit(Path fileName, long begin, long length) { + return (begin == 0) + ? new FileSplit(fileName, begin, length - 1, new String[] {}) + : new FileSplit(fileName, begin - 1, length, new String[] {}); + } /** * Set the number of lines per split Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1407706&r1=1407705&r2=1407706&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Sat Nov 10 00:49:15 2012 @@ -21,6 +21,7 @@ import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.ConnectException; import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; @@ -283,6 +284,7 @@ class Fetcher<K,V> extends Thread { SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret); LOG.info("for url="+msgToEncode+" sent hash and receievd reply"); } catch (IOException ie) { + boolean connectExcpt = ie instanceof ConnectException; ioErrs.increment(1); LOG.warn("Failed to connect to " + host + " with " + remaining.size() + " map outputs", ie); @@ -291,14 +293,14 @@ class Fetcher<K,V> extends Thread { // indirectly penalizing the host if (!connectSucceeded) { for(TaskAttemptID left: remaining) { - scheduler.copyFailed(left, host, connectSucceeded); + scheduler.copyFailed(left, host, connectSucceeded, connectExcpt); } } else { // If we got a read error at this stage, it implies there was a problem // with the first map, typically lost map. So, penalize only that map // and add the rest TaskAttemptID firstMap = maps.get(0); - scheduler.copyFailed(firstMap, host, connectSucceeded); + scheduler.copyFailed(firstMap, host, connectSucceeded, connectExcpt); } // Add back all the remaining maps, WITHOUT marking them as failed @@ -322,7 +324,7 @@ class Fetcher<K,V> extends Thread { if(failedTasks != null && failedTasks.length > 0) { LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks)); for(TaskAttemptID left: failedTasks) { - scheduler.copyFailed(left, host, true); + scheduler.copyFailed(left, host, true, false); } } Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java?rev=1407706&r1=1407705&r2=1407706&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java Sat Nov 10 00:49:15 2012 @@ -89,6 +89,7 @@ class ShuffleScheduler<K,V> { private DecimalFormat mbpsFormat = new DecimalFormat("0.00"); private boolean reportReadErrorImmediately = true; + private long maxDelay = MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY; public ShuffleScheduler(JobConf job, TaskStatus status, ExceptionReporter reporter, @@ -115,6 +116,9 @@ class ShuffleScheduler<K,V> { MRJobConfig.SHUFFLE_FETCH_FAILURES, REPORT_FAILURE_LIMIT); this.reportReadErrorImmediately = job.getBoolean( MRJobConfig.SHUFFLE_NOTIFY_READERROR, true); + + this.maxDelay = job.getLong(MRJobConfig.MAX_SHUFFLE_FETCH_RETRY_DELAY, + MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY); } public synchronized void copySucceeded(TaskAttemptID mapId, @@ -159,7 +163,7 @@ class ShuffleScheduler<K,V> { } public synchronized void copyFailed(TaskAttemptID mapId, MapHost host, - boolean readError) { + boolean readError, boolean connectExcpt) { host.penalize(); int failures = 1; if (failureCounts.containsKey(mapId)) { @@ -184,12 +188,15 @@ class ShuffleScheduler<K,V> { } } - checkAndInformJobTracker(failures, mapId, readError); + checkAndInformJobTracker(failures, mapId, readError, connectExcpt); checkReducerHealth(); long delay = (long) (INITIAL_PENALTY * Math.pow(PENALTY_GROWTH_RATE, failures)); + if (delay > maxDelay) { + delay = maxDelay; + } penalties.add(new Penalty(host, delay)); @@ -200,8 +207,9 @@ class ShuffleScheduler<K,V> { // after every read error, if 'reportReadErrorImmediately' is true or // after every 'maxFetchFailuresBeforeReporting' failures private void checkAndInformJobTracker( - int failures, TaskAttemptID mapId, boolean readError) { - if ((reportReadErrorImmediately && readError) + int failures, TaskAttemptID mapId, boolean readError, + boolean connectExcpt) { + if (connectExcpt || (reportReadErrorImmediately && readError) || ((failures % maxFetchFailuresBeforeReporting) == 0)) { LOG.info("Reporting fetch failure for " + mapId + " to jobtracker."); status.addFetchFailedMap((org.apache.hadoop.mapred.TaskAttemptID) mapId); Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1407706&r1=1407705&r2=1407706&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Sat Nov 10 00:49:15 2012 @@ -111,6 +111,14 @@ </property> <property> + <name>mapreduce.reduce.shuffle.retry-delay.max.ms</name> + <value>60000</value> + <description>The maximum number of ms the reducer will delay before retrying + to download map data. + </description> +</property> + +<property> <name>mapreduce.reduce.shuffle.parallelcopies</name> <value>5</value> <description>The default number of parallel transfers run by reduce Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1406415-1407703 Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java?rev=1407706&r1=1407705&r2=1407706&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java Sat Nov 10 00:49:15 2012 @@ -118,8 +118,8 @@ public class TestFetcher { encHash); verify(allErrs).increment(1); - verify(ss).copyFailed(map1ID, host, true); - verify(ss).copyFailed(map2ID, host, true); + verify(ss).copyFailed(map1ID, host, true, false); + verify(ss).copyFailed(map2ID, host, true, false); verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID)); verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID)); @@ -178,8 +178,8 @@ public class TestFetcher { .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash); verify(allErrs, never()).increment(1); - verify(ss, never()).copyFailed(map1ID, host, true); - verify(ss, never()).copyFailed(map2ID, host, true); + verify(ss, never()).copyFailed(map1ID, host, true, false); + verify(ss, never()).copyFailed(map2ID, host, true, false); verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID)); verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID)); Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java?rev=1407706&r1=1407705&r2=1407706&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java Sat Nov 10 00:49:15 2012 @@ -27,6 +27,8 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.source.JvmMetrics; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringUtils; @@ -106,6 +108,8 @@ public class JobHistoryServer extends Co @Override public void start() { + DefaultMetricsSystem.initialize("JobHistoryServer"); + JvmMetrics.initSingleton("JobHistoryServer", null); try { jhsDTSecretManager.startThreads(); } catch(IOException io) { @@ -118,6 +122,7 @@ public class JobHistoryServer extends Co @Override public void stop() { jhsDTSecretManager.stopThreads(); + DefaultMetricsSystem.shutdown(); super.stop(); } Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java?rev=1407706&r1=1407705&r2=1407706&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java Sat Nov 10 00:49:15 2012 @@ -56,6 +56,10 @@ public class TestIFile { Path path = new Path(new Path("build/test.ifile"), "data"); DefaultCodec codec = new GzipCodec(); codec.setConf(conf); + IFile.Writer<Text, Text> writer = + new IFile.Writer<Text, Text>(conf, rfs, path, Text.class, Text.class, + codec, null); + writer.close(); IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, rfs, path, codec, null); reader.close(); Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java?rev=1407706&r1=1407705&r2=1407706&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java Sat Nov 10 00:49:15 2012 @@ -50,37 +50,40 @@ public class TestNLineInputFormat extend Job job = Job.getInstance(conf); Path file = new Path(workDir, "test.txt"); - int seed = new Random().nextInt(); - Random random = new Random(seed); - localFs.delete(workDir, true); FileInputFormat.setInputPaths(job, workDir); int numLinesPerMap = 5; NLineInputFormat.setNumLinesPerSplit(job, numLinesPerMap); - // for a variety of lengths for (int length = 0; length < MAX_LENGTH; - length += random.nextInt(MAX_LENGTH / 10) + 1) { + length += 1) { + // create a file with length entries Writer writer = new OutputStreamWriter(localFs.create(file)); try { for (int i = 0; i < length; i++) { - writer.write(Integer.toString(i)); + writer.write(Integer.toString(i)+" some more text"); writer.write("\n"); } } finally { writer.close(); } - checkFormat(job, numLinesPerMap); + int lastN = 0; + if (length != 0) { + lastN = length % 5; + if (lastN == 0) { + lastN = 5; + } + } + checkFormat(job, numLinesPerMap, lastN); } } - void checkFormat(Job job, int expectedN) + void checkFormat(Job job, int expectedN, int lastN) throws IOException, InterruptedException { NLineInputFormat format = new NLineInputFormat(); List<InputSplit> splits = format.getSplits(job); - // check all splits except last one int count = 0; - for (int i = 0; i < splits.size() -1; i++) { + for (int i = 0; i < splits.size(); i++) { assertEquals("There are no split locations", 0, splits.get(i).getLocations().length); TaskAttemptContext context = MapReduceTestUtil. @@ -104,8 +107,13 @@ public class TestNLineInputFormat extend } finally { reader.close(); } - assertEquals("number of lines in split is " + expectedN , - expectedN, count); + if ( i == splits.size() - 1) { + assertEquals("number of lines in split(" + i + ") is wrong" , + lastN, count); + } else { + assertEquals("number of lines in split(" + i + ") is wrong" , + expectedN, count); + } } }