Author: jlowe Date: Wed Oct 24 15:25:22 2012 New Revision: 1401729 URL: http://svn.apache.org/viewvc?rev=1401729&view=rev Log: svn merge -c 1401726 FIXES: YARN-139. Interrupted Exception within AsyncDispatcher leads to user confusion. Contributed by Vinod Kumar Vavilapalli
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1401729&r1=1401728&r2=1401729&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Wed Oct 24 15:25:22 2012 @@ -107,6 +107,8 @@ import org.apache.hadoop.yarn.service.Co import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.util.ConverterUtils; +import com.google.common.annotations.VisibleForTesting; + /** * The Map-Reduce Application Master. * The state machine is encapsulated in the implementation of Job interface. @@ -398,52 +400,65 @@ public class MRAppMaster extends Composi protected void sysexit() { System.exit(0); } - - private class JobFinishEventHandler implements EventHandler<JobFinishEvent> { - @Override - public void handle(JobFinishEvent event) { - // job has finished - // this is the only job, so shut down the Appmaster - // note in a workflow scenario, this may lead to creation of a new - // job (FIXME?) - // Send job-end notification - if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) { - try { - LOG.info("Job end notification started for jobID : " - + job.getReport().getJobId()); - JobEndNotifier notifier = new JobEndNotifier(); - notifier.setConf(getConfig()); - notifier.notify(job.getReport()); - } catch (InterruptedException ie) { - LOG.warn("Job end notification interrupted for jobID : " - + job.getReport().getJobId(), ie); - } - } - // TODO:currently just wait for some time so clients can know the - // final states. Will be removed once RM come on. + @VisibleForTesting + public void shutDownJob() { + // job has finished + // this is the only job, so shut down the Appmaster + // note in a workflow scenario, this may lead to creation of a new + // job (FIXME?) + // Send job-end notification + if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) { try { - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); + LOG.info("Job end notification started for jobID : " + + job.getReport().getJobId()); + JobEndNotifier notifier = new JobEndNotifier(); + notifier.setConf(getConfig()); + notifier.notify(job.getReport()); + } catch (InterruptedException ie) { + LOG.warn("Job end notification interrupted for jobID : " + + job.getReport().getJobId(), ie); } + } - try { - //We are finishing cleanly so this is the last retry - isLastAMRetry = true; - // Stop all services - // This will also send the final report to the ResourceManager - LOG.info("Calling stop for all the services"); - stop(); - - } catch (Throwable t) { - LOG.warn("Graceful stop failed ", t); - } + // TODO:currently just wait for some time so clients can know the + // final states. Will be removed once RM come on. + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + try { + //We are finishing cleanly so this is the last retry + isLastAMRetry = true; + // Stop all services + // This will also send the final report to the ResourceManager + LOG.info("Calling stop for all the services"); + MRAppMaster.this.stop(); + + } catch (Throwable t) { + LOG.warn("Graceful stop failed ", t); + } - //Bring the process down by force. - //Not needed after HADOOP-7140 - LOG.info("Exiting MR AppMaster..GoodBye!"); - sysexit(); + //Bring the process down by force. + //Not needed after HADOOP-7140 + LOG.info("Exiting MR AppMaster..GoodBye!"); + sysexit(); + } + + private class JobFinishEventHandler implements EventHandler<JobFinishEvent> { + @Override + public void handle(JobFinishEvent event) { + // Create a new thread to shutdown the AM. We should not do it in-line + // to avoid blocking the dispatcher itself. + new Thread() { + + @Override + public void run() { + shutDownJob(); + } + }.start(); } } Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1401729&r1=1401728&r2=1401729&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Wed Oct 24 15:25:22 2012 @@ -21,17 +21,15 @@ package org.apache.hadoop.mapreduce.v2.a import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.times; import java.io.IOException; import junit.framework.Assert; import junit.framework.TestCase; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -49,7 +47,6 @@ import org.apache.hadoop.yarn.YarnExcept import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.service.AbstractService; @@ -68,7 +65,6 @@ import org.junit.Test; private Path stagingJobPath = new Path(stagingJobDir); private final static RecordFactory recordFactory = RecordFactoryProvider. getRecordFactory(null); - private static final Log LOG = LogFactory.getLog(TestStagingCleanup.class); @Test public void testDeletionofStaging() throws IOException { @@ -86,9 +82,7 @@ import org.junit.Test; jobid.setAppId(appId); MRAppMaster appMaster = new TestMRApp(attemptId); appMaster.init(conf); - EventHandler<JobFinishEvent> handler = - appMaster.createJobFinishEventHandler(); - handler.handle(new JobFinishEvent(jobid)); + appMaster.shutDownJob(); verify(fs).delete(stagingJobPath, true); }