Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1426539&r1=1426538&r2=1426539&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Fri Dec 28 15:06:15 2012 @@ -19,46 +19,51 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; import java.util.EnumSet; -import java.util.HashMap; -import java.util.Map; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TypeConverter; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; -import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler; +import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventType; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; -import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition; -import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.SystemClock; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.Records; @@ -69,121 +74,223 @@ import org.junit.Test; /** * Tests various functions of the JobImpl class */ -@SuppressWarnings({"unchecked", "rawtypes"}) +@SuppressWarnings({"rawtypes"}) public class TestJobImpl { @Test - public void testJobNoTasksTransition() { - JobNoTasksCompletedTransition trans = new JobNoTasksCompletedTransition(); - JobImpl mockJob = mock(JobImpl.class); - - // Force checkJobCompleteSuccess to return null - Task mockTask = mock(Task.class); - Map<TaskId, Task> tasks = new HashMap<TaskId, Task>(); - tasks.put(mockTask.getID(), mockTask); - mockJob.tasks = tasks; + public void testJobNoTasks() { + Configuration conf = new Configuration(); + conf.setInt(MRJobConfig.NUM_REDUCES, 0); + AsyncDispatcher dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + OutputCommitter committer = mock(OutputCommitter.class); + CommitterEventHandler commitHandler = + createCommitterEventHandler(dispatcher, committer); + commitHandler.init(conf); + commitHandler.start(); + + JobImpl job = createStubbedJob(conf, dispatcher, 0); + job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT)); + assertJobState(job, JobStateInternal.INITED); + job.handle(new JobEvent(job.getID(), JobEventType.JOB_START)); + assertJobState(job, JobStateInternal.SUCCEEDED); + dispatcher.stop(); + commitHandler.stop(); + } - when(mockJob.getInternalState()).thenReturn(JobStateInternal.ERROR); - JobEvent mockJobEvent = mock(JobEvent.class); - JobStateInternal state = trans.transition(mockJob, mockJobEvent); - Assert.assertEquals("Incorrect state returned from JobNoTasksCompletedTransition", - JobStateInternal.ERROR, state); + @Test(timeout=20000) + public void testCommitJobFailsJob() throws Exception { + Configuration conf = new Configuration(); + AsyncDispatcher dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + CyclicBarrier syncBarrier = new CyclicBarrier(2); + OutputCommitter committer = new TestingOutputCommitter(syncBarrier, false); + CommitterEventHandler commitHandler = + createCommitterEventHandler(dispatcher, committer); + commitHandler.init(conf); + commitHandler.start(); + + JobImpl job = createRunningStubbedJob(conf, dispatcher, 2); + completeJobTasks(job); + assertJobState(job, JobStateInternal.COMMITTING); + + // let the committer fail and verify the job fails + syncBarrier.await(); + assertJobState(job, JobStateInternal.FAILED); + dispatcher.stop(); + commitHandler.stop(); } - @Test - public void testCommitJobFailsJob() { + @Test(timeout=20000) + public void testCheckJobCompleteSuccess() throws Exception { + Configuration conf = new Configuration(); + AsyncDispatcher dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + CyclicBarrier syncBarrier = new CyclicBarrier(2); + OutputCommitter committer = new TestingOutputCommitter(syncBarrier, true); + CommitterEventHandler commitHandler = + createCommitterEventHandler(dispatcher, committer); + commitHandler.init(conf); + commitHandler.start(); + + JobImpl job = createRunningStubbedJob(conf, dispatcher, 2); + completeJobTasks(job); + assertJobState(job, JobStateInternal.COMMITTING); + + // let the committer complete and verify the job succeeds + syncBarrier.await(); + assertJobState(job, JobStateInternal.SUCCEEDED); + dispatcher.stop(); + commitHandler.stop(); + } + + @Test(timeout=20000) + public void testKilledDuringSetup() throws Exception { + Configuration conf = new Configuration(); + AsyncDispatcher dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + OutputCommitter committer = new StubbedOutputCommitter() { + @Override + public synchronized void setupJob(JobContext jobContext) + throws IOException { + while (!Thread.interrupted()) { + try { + wait(); + } catch (InterruptedException e) { + } + } + } + }; + CommitterEventHandler commitHandler = + createCommitterEventHandler(dispatcher, committer); + commitHandler.init(conf); + commitHandler.start(); - JobImpl mockJob = mock(JobImpl.class); - mockJob.tasks = new HashMap<TaskId, Task>(); - OutputCommitter mockCommitter = mock(OutputCommitter.class); - EventHandler mockEventHandler = mock(EventHandler.class); - JobContext mockJobContext = mock(JobContext.class); - - when(mockJob.getCommitter()).thenReturn(mockCommitter); - when(mockJob.getEventHandler()).thenReturn(mockEventHandler); - when(mockJob.getJobContext()).thenReturn(mockJobContext); - when(mockJob.finished(JobStateInternal.KILLED)).thenReturn( - JobStateInternal.KILLED); - when(mockJob.finished(JobStateInternal.FAILED)).thenReturn( - JobStateInternal.FAILED); - when(mockJob.finished(JobStateInternal.SUCCEEDED)).thenReturn( - JobStateInternal.SUCCEEDED); - - try { - doThrow(new IOException()).when(mockCommitter).commitJob(any(JobContext.class)); - } catch (IOException e) { - // commitJob stubbed out, so this can't happen - } - doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class)); - JobStateInternal jobState = JobImpl.checkJobCompleteSuccess(mockJob); - Assert.assertNotNull("checkJobCompleteSuccess incorrectly returns null " + - "for successful job", jobState); - Assert.assertEquals("checkJobCompleteSuccess returns incorrect state", - JobStateInternal.FAILED, jobState); - verify(mockJob).abortJob( - eq(org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); + JobImpl job = createStubbedJob(conf, dispatcher, 2); + JobId jobId = job.getID(); + job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); + assertJobState(job, JobStateInternal.INITED); + job.handle(new JobEvent(jobId, JobEventType.JOB_START)); + assertJobState(job, JobStateInternal.SETUP); + + job.handle(new JobEvent(job.getID(), JobEventType.JOB_KILL)); + assertJobState(job, JobStateInternal.KILLED); + dispatcher.stop(); + commitHandler.stop(); } - @Test - public void testCheckJobCompleteSuccess() { - - JobImpl mockJob = mock(JobImpl.class); - mockJob.tasks = new HashMap<TaskId, Task>(); - OutputCommitter mockCommitter = mock(OutputCommitter.class); - EventHandler mockEventHandler = mock(EventHandler.class); - JobContext mockJobContext = mock(JobContext.class); - - when(mockJob.getCommitter()).thenReturn(mockCommitter); - when(mockJob.getEventHandler()).thenReturn(mockEventHandler); - when(mockJob.getJobContext()).thenReturn(mockJobContext); - doNothing().when(mockJob).setFinishTime(); - doNothing().when(mockJob).logJobHistoryFinishedEvent(); - when(mockJob.finished(any(JobStateInternal.class))).thenReturn( - JobStateInternal.SUCCEEDED); - - try { - doNothing().when(mockCommitter).commitJob(any(JobContext.class)); - } catch (IOException e) { - // commitJob stubbed out, so this can't happen - } - doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class)); - Assert.assertNotNull("checkJobCompleteSuccess incorrectly returns null " + - "for successful job", - JobImpl.checkJobCompleteSuccess(mockJob)); - Assert.assertEquals("checkJobCompleteSuccess returns incorrect state", - JobStateInternal.SUCCEEDED, JobImpl.checkJobCompleteSuccess(mockJob)); + @Test(timeout=20000) + public void testKilledDuringCommit() throws Exception { + Configuration conf = new Configuration(); + AsyncDispatcher dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + CyclicBarrier syncBarrier = new CyclicBarrier(2); + OutputCommitter committer = new WaitingOutputCommitter(syncBarrier, true); + CommitterEventHandler commitHandler = + createCommitterEventHandler(dispatcher, committer); + commitHandler.init(conf); + commitHandler.start(); + + JobImpl job = createRunningStubbedJob(conf, dispatcher, 2); + completeJobTasks(job); + assertJobState(job, JobStateInternal.COMMITTING); + + syncBarrier.await(); + job.handle(new JobEvent(job.getID(), JobEventType.JOB_KILL)); + assertJobState(job, JobStateInternal.KILLED); + dispatcher.stop(); + commitHandler.stop(); } - @Test - public void testCheckJobCompleteSuccessFailed() { - JobImpl mockJob = mock(JobImpl.class); + @Test(timeout=20000) + public void testKilledDuringFailAbort() throws Exception { + Configuration conf = new Configuration(); + AsyncDispatcher dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + OutputCommitter committer = new StubbedOutputCommitter() { + @Override + public void setupJob(JobContext jobContext) throws IOException { + throw new IOException("forced failure"); + } + + @Override + public synchronized void abortJob(JobContext jobContext, State state) + throws IOException { + while (!Thread.interrupted()) { + try { + wait(); + } catch (InterruptedException e) { + } + } + } + }; + CommitterEventHandler commitHandler = + createCommitterEventHandler(dispatcher, committer); + commitHandler.init(conf); + commitHandler.start(); + + JobImpl job = createStubbedJob(conf, dispatcher, 2); + JobId jobId = job.getID(); + job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); + assertJobState(job, JobStateInternal.INITED); + job.handle(new JobEvent(jobId, JobEventType.JOB_START)); + assertJobState(job, JobStateInternal.FAIL_ABORT); - // Make the completedTasks not equal the getTasks() - Task mockTask = mock(Task.class); - Map<TaskId, Task> tasks = new HashMap<TaskId, Task>(); - tasks.put(mockTask.getID(), mockTask); - mockJob.tasks = tasks; - - try { - // Just in case the code breaks and reaches these calls - OutputCommitter mockCommitter = mock(OutputCommitter.class); - EventHandler mockEventHandler = mock(EventHandler.class); - doNothing().when(mockCommitter).commitJob(any(JobContext.class)); - doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class)); - } catch (IOException e) { - e.printStackTrace(); - } - Assert.assertNull("checkJobCompleteSuccess incorrectly returns not-null " + - "for unsuccessful job", - JobImpl.checkJobCompleteSuccess(mockJob)); + job.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); + assertJobState(job, JobStateInternal.KILLED); + dispatcher.stop(); + commitHandler.stop(); } + @Test(timeout=20000) + public void testKilledDuringKillAbort() throws Exception { + Configuration conf = new Configuration(); + AsyncDispatcher dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + OutputCommitter committer = new StubbedOutputCommitter() { + @Override + public synchronized void abortJob(JobContext jobContext, State state) + throws IOException { + while (!Thread.interrupted()) { + try { + wait(); + } catch (InterruptedException e) { + } + } + } + }; + CommitterEventHandler commitHandler = + createCommitterEventHandler(dispatcher, committer); + commitHandler.init(conf); + commitHandler.start(); + + JobImpl job = createStubbedJob(conf, dispatcher, 2); + JobId jobId = job.getID(); + job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); + assertJobState(job, JobStateInternal.INITED); + job.handle(new JobEvent(jobId, JobEventType.JOB_START)); + assertJobState(job, JobStateInternal.SETUP); + + job.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); + assertJobState(job, JobStateInternal.KILL_ABORT); + + job.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); + assertJobState(job, JobStateInternal.KILLED); + dispatcher.stop(); + commitHandler.stop(); + } public static void main(String[] args) throws Exception { TestJobImpl t = new TestJobImpl(); - t.testJobNoTasksTransition(); + t.testJobNoTasks(); t.testCheckJobCompleteSuccess(); - t.testCheckJobCompleteSuccessFailed(); t.testCheckAccess(); t.testReportDiagnostics(); t.testUberDecision(); @@ -208,7 +315,7 @@ public class TestJobImpl { // Verify access JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null, - null, null, null, true, null, 0, null, null); + null, null, true, null, 0, null, null); Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB)); Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB)); @@ -219,7 +326,7 @@ public class TestJobImpl { // Verify access JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null, - null, null, null, true, null, 0, null, null); + null, null, true, null, 0, null, null); Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB)); Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB)); @@ -230,7 +337,7 @@ public class TestJobImpl { // Verify access JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null, - null, null, null, true, null, 0, null, null); + null, null, true, null, 0, null, null); Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB)); Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB)); @@ -241,7 +348,7 @@ public class TestJobImpl { // Verify access JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null, - null, null, null, true, null, 0, null, null); + null, null, true, null, 0, null, null); Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB)); Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB)); @@ -252,7 +359,7 @@ public class TestJobImpl { // Verify access JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null, - null, null, null, true, null, 0, null, null); + null, null, true, null, 0, null, null); Assert.assertTrue(job5.checkAccess(ugi1, null)); Assert.assertTrue(job5.checkAccess(ugi2, null)); } @@ -270,8 +377,7 @@ public class TestJobImpl { mock(EventHandler.class), null, mock(JobTokenSecretManager.class), null, new SystemClock(), null, - mrAppMetrics, mock(OutputCommitter.class), - true, null, 0, null, null); + mrAppMetrics, true, null, 0, null, null); job.handle(diagUpdateEvent); String diagnostics = job.getReport().getDiagnostics(); Assert.assertNotNull(diagnostics); @@ -282,8 +388,7 @@ public class TestJobImpl { mock(EventHandler.class), null, mock(JobTokenSecretManager.class), null, new SystemClock(), null, - mrAppMetrics, mock(OutputCommitter.class), - true, null, 0, null, null); + mrAppMetrics, true, null, 0, null, null); job.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); job.handle(diagUpdateEvent); diagnostics = job.getReport().getDiagnostics(); @@ -338,20 +443,23 @@ public class TestJobImpl { JobImpl job = new JobImpl(jobId, Records .newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class), null, mock(JobTokenSecretManager.class), null, null, null, - mrAppMetrics, mock(OutputCommitter.class), true, null, 0, null, null); - InitTransition initTransition = getInitTransition(); + mrAppMetrics, true, null, 0, null, null); + InitTransition initTransition = getInitTransition(2); JobEvent mockJobEvent = mock(JobEvent.class); initTransition.transition(job, mockJobEvent); boolean isUber = job.isUber(); return isUber; } - private static InitTransition getInitTransition() { + private static InitTransition getInitTransition(final int numSplits) { InitTransition initTransition = new InitTransition() { @Override protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) { - return new TaskSplitMetaInfo[] { new TaskSplitMetaInfo(), - new TaskSplitMetaInfo() }; + TaskSplitMetaInfo[] splits = new TaskSplitMetaInfo[numSplits]; + for (int i = 0; i < numSplits; ++i) { + splits[i] = new TaskSplitMetaInfo(); + } + return splits; } }; return initTransition; @@ -360,19 +468,24 @@ public class TestJobImpl { @Test public void testTransitionsAtFailed() throws IOException { Configuration conf = new Configuration(); - JobID jobID = JobID.forName("job_1234567890000_0001"); - JobId jobId = TypeConverter.toYarn(jobID); + AsyncDispatcher dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + OutputCommitter committer = mock(OutputCommitter.class); doThrow(new IOException("forcefail")) .when(committer).setupJob(any(JobContext.class)); - InlineDispatcher dispatcher = new InlineDispatcher(); - JobImpl job = new StubbedJob(jobId, Records - .newRecord(ApplicationAttemptId.class), conf, - dispatcher.getEventHandler(), committer, true, null); + CommitterEventHandler commitHandler = + createCommitterEventHandler(dispatcher, committer); + commitHandler.init(conf); + commitHandler.start(); - dispatcher.register(JobEventType.class, job); + JobImpl job = createStubbedJob(conf, dispatcher, 2); + JobId jobId = job.getID(); job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); - Assert.assertEquals(JobState.FAILED, job.getState()); + assertJobState(job, JobStateInternal.INITED); + job.handle(new JobEvent(jobId, JobEventType.JOB_START)); + assertJobState(job, JobStateInternal.FAILED); job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED)); Assert.assertEquals(JobState.FAILED, job.getState()); @@ -382,17 +495,86 @@ public class TestJobImpl { Assert.assertEquals(JobState.FAILED, job.getState()); job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)); Assert.assertEquals(JobState.FAILED, job.getState()); + + dispatcher.stop(); + commitHandler.stop(); + } + + private static CommitterEventHandler createCommitterEventHandler( + Dispatcher dispatcher, OutputCommitter committer) { + SystemClock clock = new SystemClock(); + AppContext appContext = mock(AppContext.class); + when(appContext.getEventHandler()).thenReturn( + dispatcher.getEventHandler()); + when(appContext.getClock()).thenReturn(clock); + CommitterEventHandler handler = + new CommitterEventHandler(appContext, committer); + dispatcher.register(CommitterEventType.class, handler); + return handler; + } + + private static StubbedJob createStubbedJob(Configuration conf, + Dispatcher dispatcher, int numSplits) { + JobID jobID = JobID.forName("job_1234567890000_0001"); + JobId jobId = TypeConverter.toYarn(jobID); + StubbedJob job = new StubbedJob(jobId, + Records.newRecord(ApplicationAttemptId.class), conf, + dispatcher.getEventHandler(), true, "somebody", numSplits); + dispatcher.register(JobEventType.class, job); + EventHandler mockHandler = mock(EventHandler.class); + dispatcher.register(TaskEventType.class, mockHandler); + dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class, + mockHandler); + dispatcher.register(JobFinishEvent.Type.class, mockHandler); + return job; + } + + private static StubbedJob createRunningStubbedJob(Configuration conf, + Dispatcher dispatcher, int numSplits) { + StubbedJob job = createStubbedJob(conf, dispatcher, numSplits); + job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT)); + assertJobState(job, JobStateInternal.INITED); + job.handle(new JobEvent(job.getID(), JobEventType.JOB_START)); + assertJobState(job, JobStateInternal.RUNNING); + return job; + } + + private static void completeJobTasks(JobImpl job) { + // complete the map tasks and the reduce tasks so we start committing + int numMaps = job.getTotalMaps(); + for (int i = 0; i < numMaps; ++i) { + job.handle(new JobTaskEvent( + MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP), + TaskState.SUCCEEDED)); + Assert.assertEquals(JobState.RUNNING, job.getState()); + } + int numReduces = job.getTotalReduces(); + for (int i = 0; i < numReduces; ++i) { + job.handle(new JobTaskEvent( + MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP), + TaskState.SUCCEEDED)); + Assert.assertEquals(JobState.RUNNING, job.getState()); + } + } + + private static void assertJobState(JobImpl job, JobStateInternal state) { + int timeToWaitMsec = 5 * 1000; + while (timeToWaitMsec > 0 && job.getInternalState() != state) { + try { + Thread.sleep(10); + timeToWaitMsec -= 10; + } catch (InterruptedException e) { + break; + } + } + Assert.assertEquals(state, job.getInternalState()); } private static class StubbedJob extends JobImpl { //override the init transition - private final InitTransition initTransition = getInitTransition(); - StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> localFactory - = stateMachineFactory.addTransition(JobStateInternal.NEW, - EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED), - JobEventType.JOB_INIT, - // This is abusive. - initTransition); + private final InitTransition initTransition; + StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> + localFactory; private final StateMachine<JobStateInternal, JobEventType, JobEvent> localStateMachine; @@ -404,15 +586,102 @@ public class TestJobImpl { public StubbedJob(JobId jobId, ApplicationAttemptId applicationAttemptId, Configuration conf, EventHandler eventHandler, - OutputCommitter committer, boolean newApiCommitter, String user) { + boolean newApiCommitter, String user, int numSplits) { super(jobId, applicationAttemptId, conf, eventHandler, null, new JobTokenSecretManager(), new Credentials(), - new SystemClock(), null, MRAppMetrics.create(), committer, + new SystemClock(), null, MRAppMetrics.create(), newApiCommitter, user, System.currentTimeMillis(), null, null); + initTransition = getInitTransition(numSplits); + localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW, + EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED), + JobEventType.JOB_INIT, + // This is abusive. + initTransition); + // This "this leak" is okay because the retained pointer is in an // instance variable. localStateMachine = localFactory.make(this); } } + + private static class StubbedOutputCommitter extends OutputCommitter { + + public StubbedOutputCommitter() { + super(); + } + + @Override + public void setupJob(JobContext jobContext) throws IOException { + } + + @Override + public void setupTask(TaskAttemptContext taskContext) throws IOException { + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskContext) + throws IOException { + return false; + } + + @Override + public void commitTask(TaskAttemptContext taskContext) throws IOException { + } + + @Override + public void abortTask(TaskAttemptContext taskContext) throws IOException { + } + } + + private static class TestingOutputCommitter extends StubbedOutputCommitter { + CyclicBarrier syncBarrier; + boolean shouldSucceed; + + public TestingOutputCommitter(CyclicBarrier syncBarrier, + boolean shouldSucceed) { + super(); + this.syncBarrier = syncBarrier; + this.shouldSucceed = shouldSucceed; + } + + @Override + public void commitJob(JobContext jobContext) throws IOException { + try { + syncBarrier.await(); + } catch (BrokenBarrierException e) { + } catch (InterruptedException e) { + } + + if (!shouldSucceed) { + throw new IOException("forced failure"); + } + } + } + + private static class WaitingOutputCommitter extends TestingOutputCommitter { + public WaitingOutputCommitter(CyclicBarrier syncBarrier, + boolean shouldSucceed) { + super(syncBarrier, shouldSucceed); + } + + @Override + public void commitJob(JobContext jobContext) throws IOException { + try { + syncBarrier.await(); + } catch (BrokenBarrierException e) { + } catch (InterruptedException e) { + } + + while (!Thread.interrupted()) { + try { + synchronized (this) { + wait(); + } + } catch (InterruptedException e) { + break; + } + } + } + } }
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1426539&r1=1426538&r2=1426539&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Fri Dec 28 15:06:15 2012 @@ -43,7 +43,6 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapTaskAttemptImpl; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; @@ -253,10 +252,9 @@ public class TestTaskAttempt{ TaskAttemptListener taListener = mock(TaskAttemptListener.class); Path jobFile = mock(Path.class); JobConf jobConf = new JobConf(); - OutputCommitter outputCommitter = mock(OutputCommitter.class); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, - taskSplitMetaInfo, jobConf, taListener, outputCommitter, null, + taskSplitMetaInfo, jobConf, taListener, null, null, clock, null); return taImpl; } @@ -342,7 +340,7 @@ public class TestTaskAttempt{ TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, splits, jobConf, taListener, - mock(OutputCommitter.class), mock(Token.class), new Credentials(), + mock(Token.class), new Credentials(), new SystemClock(), null); NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0); @@ -397,7 +395,7 @@ public class TestTaskAttempt{ TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, splits, jobConf, taListener, - mock(OutputCommitter.class), mock(Token.class), new Credentials(), + mock(Token.class), new Credentials(), new SystemClock(), appCtx); NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0); @@ -453,7 +451,7 @@ public class TestTaskAttempt{ TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, splits, jobConf, taListener, - mock(OutputCommitter.class), mock(Token.class), new Credentials(), + mock(Token.class), new Credentials(), new SystemClock(), appCtx); NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0); @@ -512,7 +510,7 @@ public class TestTaskAttempt{ TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, splits, jobConf, taListener, - mock(OutputCommitter.class), mock(Token.class), new Credentials(), + mock(Token.class), new Credentials(), new SystemClock(), appCtx); NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0); @@ -578,7 +576,7 @@ public class TestTaskAttempt{ when(resource.getMemory()).thenReturn(1024); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, - jobFile, 1, splits, jobConf, taListener, mock(OutputCommitter.class), + jobFile, 1, splits, jobConf, taListener, mock(Token.class), new Credentials(), new SystemClock(), appCtx); NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0); @@ -628,7 +626,7 @@ public class TestTaskAttempt{ when(resource.getMemory()).thenReturn(1024); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, - jobFile, 1, splits, jobConf, taListener, mock(OutputCommitter.class), + jobFile, 1, splits, jobConf, taListener, mock(Token.class), new Credentials(), new SystemClock(), appCtx); NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0); Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java?rev=1426539&r1=1426538&r2=1426539&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java Fri Dec 28 15:06:15 2012 @@ -40,7 +40,6 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapTaskAttemptImpl; import org.apache.hadoop.mapred.WrappedJvmID; import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; @@ -107,7 +106,7 @@ public class TestTaskAttemptContainerReq TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, mock(TaskSplitMetaInfo.class), jobConf, taListener, - mock(OutputCommitter.class), jobToken, credentials, + jobToken, credentials, new SystemClock(), null); jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString()); Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1426539&r1=1426538&r2=1426539&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Fri Dec 28 15:06:15 2012 @@ -35,7 +35,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Task; import org.apache.hadoop.mapred.TaskUmbilicalProtocol; -import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; @@ -71,7 +70,6 @@ public class TestTaskImpl { private JobConf conf; private TaskAttemptListener taskAttemptListener; - private OutputCommitter committer; private Token<JobTokenIdentifier> jobToken; private JobId jobId; private Path remoteJobConfFile; @@ -99,13 +97,13 @@ public class TestTaskImpl { public MockTaskImpl(JobId jobId, int partition, EventHandler eventHandler, Path remoteJobConfFile, JobConf conf, - TaskAttemptListener taskAttemptListener, OutputCommitter committer, + TaskAttemptListener taskAttemptListener, Token<JobTokenIdentifier> jobToken, Credentials credentials, Clock clock, Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount, MRAppMetrics metrics, AppContext appContext, TaskType taskType) { super(jobId, taskType , partition, eventHandler, - remoteJobConfFile, conf, taskAttemptListener, committer, + remoteJobConfFile, conf, taskAttemptListener, jobToken, credentials, clock, completedTasksFromPreviousRun, startCount, metrics, appContext); this.taskType = taskType; @@ -120,7 +118,7 @@ public class TestTaskImpl { protected TaskAttemptImpl createAttempt() { MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getID(), ++taskAttemptCounter, eventHandler, taskAttemptListener, remoteJobConfFile, partition, - conf, committer, jobToken, credentials, clock, appContext, taskType); + conf, jobToken, credentials, clock, appContext, taskType); taskAttempts.add(attempt); return attempt; } @@ -145,12 +143,11 @@ public class TestTaskImpl { public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, Path jobFile, int partition, - JobConf conf, OutputCommitter committer, - Token<JobTokenIdentifier> jobToken, + JobConf conf, Token<JobTokenIdentifier> jobToken, Credentials credentials, Clock clock, AppContext appContext, TaskType taskType) { super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf, - dataLocations, committer, jobToken, credentials, clock, appContext); + dataLocations, jobToken, credentials, clock, appContext); this.taskType = taskType; } @@ -210,7 +207,6 @@ public class TestTaskImpl { conf = new JobConf(); taskAttemptListener = mock(TaskAttemptListener.class); - committer = mock(OutputCommitter.class); jobToken = (Token<JobTokenIdentifier>) mock(Token.class); remoteJobConfFile = mock(Path.class); credentials = null; @@ -235,7 +231,7 @@ public class TestTaskImpl { private MockTaskImpl createMockTask(TaskType taskType) { return new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(), - remoteJobConfFile, conf, taskAttemptListener, committer, jobToken, + remoteJobConfFile, conf, taskAttemptListener, jobToken, credentials, clock, completedTasksFromPreviousRun, startCount, metrics, appContext, taskType); @@ -606,7 +602,7 @@ public class TestTaskImpl { @Test public void testFailedTransitions() { mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(), - remoteJobConfFile, conf, taskAttemptListener, committer, jobToken, + remoteJobConfFile, conf, taskAttemptListener, jobToken, credentials, clock, completedTasksFromPreviousRun, startCount, metrics, appContext, TaskType.MAP) { Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1426539&r1=1426538&r2=1426539&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Fri Dec 28 15:06:15 2012 @@ -462,6 +462,15 @@ public interface MRJobConfig { public static final int DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS = 360000; /** + * How long to wait in milliseconds for the output committer to cancel + * an operation when the job is being killed + */ + public static final String MR_AM_COMMITTER_CANCEL_TIMEOUT_MS = + MR_AM_PREFIX + "job.committer.cancel-timeout"; + public static final int DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS = + 60 * 1000; + + /** * Boolean. Create the base dirs in the JobHistoryEventHandler * Set to false for multi-user clusters. This is an internal config that * is set by the MR framework and read by it too. Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1426539&r1=1426538&r2=1426539&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Fri Dec 28 15:06:15 2012 @@ -1298,6 +1298,13 @@ </property> <property> + <name>yarn.app.mapreduce.am.job.committer.cancel-timeout</name> + <value>60000</value> + <description>The amount of time in milliseconds to wait for the output + committer to cancel an operation if the job is killed</description> +</property> + +<property> <name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name> <value>1000</value> <description>The interval in ms at which the MR AppMaster should send