Author: acmurthy Date: Fri Apr 5 12:24:53 2013 New Revision: 1464947 URL: http://svn.apache.org/r1464947 Log: MAPREDUCE-4824. Provide a mechanism for jobs to indicate they should not be recovered on JobTracker restart. Contributed by Tom White and Arun C. Murthy.
Modified: hadoop/common/branches/branch-1/CHANGES.txt hadoop/common/branches/branch-1/src/mapred/mapred-default.xml hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobConf.java hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java Modified: hadoop/common/branches/branch-1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1464947&r1=1464946&r2=1464947&view=diff ============================================================================== --- hadoop/common/branches/branch-1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1/CHANGES.txt Fri Apr 5 12:24:53 2013 @@ -102,6 +102,9 @@ Release 1.2.0 - unreleased HDFS-4651. Offline Image Viewer backport to branch-1. (Chris Nauroth via suresh) + MAPREDUCE-4824. Provide a mechanism for jobs to indicate they should not + be recovered on JobTracker restart. (tomwhite & acmurthy via acmurthy) + IMPROVEMENTS HADOOP-9434. Backport HADOOP-9267: hadoop -h|-{0,2}help should print usage. Modified: hadoop/common/branches/branch-1/src/mapred/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/mapred-default.xml?rev=1464947&r1=1464946&r2=1464947&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/mapred-default.xml (original) +++ hadoop/common/branches/branch-1/src/mapred/mapred-default.xml Fri Apr 5 12:24:53 2013 @@ -289,6 +289,17 @@ </property> <property> + <name>mapreduce.job.restart.recover</name> + <value>true</value> + <description>A per-job override for job recovery. If set to false for a + job then job recovery will not be attempted for that job upon restart + even if mapred.jobtracker.restart.recover is enabled. Defaults to true + so that jobs are recovered by default if + mapred.jobtracker.restart.recover is enabled. + </description> +</property> + +<property> <name>mapred.jobtracker.job.history.block.size</name> <value>3145728</value> <description>The block size of the job history file. Since the job recovery Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobConf.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobConf.java?rev=1464947&r1=1464946&r2=1464947&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobConf.java (original) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobConf.java Fri Apr 5 12:24:53 2013 @@ -327,6 +327,11 @@ public class JobConf extends Configurati public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN = "^mapreduce\\.workflow\\.adjacency\\..+"; + public static final String MAPREDUCE_RECOVER_JOB = + "mapreduce.job.restart.recover"; + + public static final boolean DEFAULT_MAPREDUCE_RECOVER_JOB = true; + private Credentials credentials = new Credentials(); /** Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1464947&r1=1464946&r2=1464947&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Apr 5 12:24:53 2013 @@ -1428,13 +1428,19 @@ public class JobTracker implements MRCon // Re-submit job final UserGroupInformation ugi = UserGroupInformation .createRemoteUser(token.getUser().toString()); - ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { + JobStatus status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException { return submitJob(JobID.downgrade(token.getJobID()), token .getJobSubmitDir().toString(), ugi, ts, true); } }); - recovered++; + if (status == null) { + LOG.info("Job " + jobId + " was not recovered since it " + + "disabled recovery on restart (" + JobConf.MAPREDUCE_RECOVER_JOB + + " set to 'false')."); + } else { + recovered++; + } } catch (Exception e) { LOG.warn("Could not recover job " + jobId, e); } @@ -3515,8 +3521,10 @@ public class JobTracker implements MRCon * JobStatus. Those two sub-objects are sometimes shipped outside of the * JobTracker. But JobInProgress adds info that's useful for the JobTracker * alone. + * @return null if the job is being recovered but mapred.job.restart.recover + * is false. */ - public JobStatus submitJob(JobID jobId, String jobSubmitDir, + JobStatus submitJob(JobID jobId, String jobSubmitDir, UserGroupInformation ugi, Credentials ts, boolean recovered) throws IOException { // Check for safe-mode @@ -3544,6 +3552,13 @@ public class JobTracker implements MRCon throw new IOException(e); } + if (recovered && + !job.getJobConf().getBoolean( + JobConf.MAPREDUCE_RECOVER_JOB, + JobConf.DEFAULT_MAPREDUCE_RECOVER_JOB)) { + return null; + } + synchronized (this) { // check if queue is RUNNING String queue = job.getProfile().getQueueName(); Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=1464947&r1=1464946&r2=1464947&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java (original) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java Fri Apr 5 12:24:53 2013 @@ -202,8 +202,8 @@ public class TestRecoveryManager { new Path(TEST_DIR, "output3"), 2, 0, "test-resubmission", signalFile, signalFile); - JobClient jc = new JobClient(job1); - RunningJob rJob1 = jc.submitJob(job1); + JobClient jc1 = new JobClient(job1); + RunningJob rJob1 = jc1.submitJob(job1); LOG.info("Submitted first job " + rJob1.getID()); while (rJob1.mapProgress() < 0.5f) { @@ -211,6 +211,27 @@ public class TestRecoveryManager { UtilsForTests.waitFor(100); } + // now submit job2 + JobConf job2 = mr.createJobConf(); + + String signalFile1 = new Path(TEST_DIR, "signal1").toString(); + UtilsForTests.configureWaitingJobConf(job2, + new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output4"), 20, 0, + "test-recovery-manager", signalFile1, signalFile1); + job2.setBoolean(JobConf.MAPREDUCE_RECOVER_JOB, false); // don't recover + + // submit the job + RunningJob rJob2 = (new JobClient(job2)).submitJob(job2); + LOG.info("Submitted job " + rJob2.getID()); + + // wait for it to init + JobInProgress jip2 = jobtracker.getJob(rJob2.getID()); + + while (!jip2.inited()) { + LOG.info("Waiting for job " + jip2.getJobID() + " to be inited"); + UtilsForTests.waitFor(100); + } + // kill the jobtracker LOG.info("Stopping jobtracker"); mr.stopJobTracker(); @@ -218,11 +239,11 @@ public class TestRecoveryManager { // start the jobtracker LOG.info("Starting jobtracker"); mr.startJobTracker(); - UtilsForTests.waitForJobTracker(jc); + UtilsForTests.waitForJobTracker(jc1); jobtracker = mr.getJobTrackerRunner().getJobTracker(); - // assert that job is recovered by the jobtracker + // assert that only job1 is recovered by the jobtracker Assert.assertEquals("Resubmission failed ", 1, jobtracker.getAllJobs().length);