Author: acmurthy
Date: Fri Apr  5 12:24:53 2013
New Revision: 1464947

URL: http://svn.apache.org/r1464947
Log:
MAPREDUCE-4824. Provide a mechanism for jobs to indicate they should not be 
recovered on JobTracker restart. Contributed by Tom White and Arun C. Murthy.

Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/src/mapred/mapred-default.xml
    
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobConf.java
    
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1464947&r1=1464946&r2=1464947&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Fri Apr  5 12:24:53 2013
@@ -102,6 +102,9 @@ Release 1.2.0 - unreleased
     HDFS-4651. Offline Image Viewer backport to branch-1.
     (Chris Nauroth via suresh)
 
+    MAPREDUCE-4824. Provide a mechanism for jobs to indicate they should not
+    be recovered on JobTracker restart. (tomwhite & acmurthy via acmurthy) 
+
   IMPROVEMENTS
 
     HADOOP-9434. Backport HADOOP-9267: hadoop -h|-{0,2}help should print usage.

Modified: hadoop/common/branches/branch-1/src/mapred/mapred-default.xml
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/mapred-default.xml?rev=1464947&r1=1464946&r2=1464947&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-1/src/mapred/mapred-default.xml Fri Apr  5 
12:24:53 2013
@@ -289,6 +289,17 @@
 </property>
 
 <property>
+  <name>mapreduce.job.restart.recover</name>
+  <value>true</value>
+  <description>A per-job override for job recovery. If set to false for a
+      job then job recovery will not be attempted for that job upon restart
+      even if mapred.jobtracker.restart.recover is enabled. Defaults to true
+      so that jobs are recovered by default if
+      mapred.jobtracker.restart.recover is enabled.
+  </description>
+</property>
+
+<property>
   <name>mapred.jobtracker.job.history.block.size</name>
   <value>3145728</value>
   <description>The block size of the job history file. Since the job recovery

Modified: 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobConf.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobConf.java?rev=1464947&r1=1464946&r2=1464947&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobConf.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobConf.java
 Fri Apr  5 12:24:53 2013
@@ -327,6 +327,11 @@ public class JobConf extends Configurati
   public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN =
       "^mapreduce\\.workflow\\.adjacency\\..+";
 
+  public static final String MAPREDUCE_RECOVER_JOB = 
+      "mapreduce.job.restart.recover";
+
+  public static final boolean DEFAULT_MAPREDUCE_RECOVER_JOB = true; 
+
   private Credentials credentials = new Credentials();
   
   /**

Modified: 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1464947&r1=1464946&r2=1464947&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
 Fri Apr  5 12:24:53 2013
@@ -1428,13 +1428,19 @@ public class JobTracker implements MRCon
           // Re-submit job  
           final UserGroupInformation ugi = UserGroupInformation
               .createRemoteUser(token.getUser().toString());
-          ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
+          JobStatus status = ugi.doAs(new 
PrivilegedExceptionAction<JobStatus>() {
             public JobStatus run() throws IOException, InterruptedException {
               return submitJob(JobID.downgrade(token.getJobID()), token
                   .getJobSubmitDir().toString(), ugi, ts, true);
             }
           });
-          recovered++;
+          if (status == null) {
+            LOG.info("Job " + jobId + " was not recovered since it " +
+              "disabled recovery on restart (" + JobConf.MAPREDUCE_RECOVER_JOB 
+
+              " set to 'false').");
+          } else {
+            recovered++;
+          }
         } catch (Exception e) {
           LOG.warn("Could not recover job " + jobId, e);
         }
@@ -3515,8 +3521,10 @@ public class JobTracker implements MRCon
    * JobStatus. Those two sub-objects are sometimes shipped outside of the
    * JobTracker. But JobInProgress adds info that's useful for the JobTracker
    * alone.
+   * @return null if the job is being recovered but mapred.job.restart.recover
+   * is false.
    */
-  public JobStatus submitJob(JobID jobId, String jobSubmitDir,
+  JobStatus submitJob(JobID jobId, String jobSubmitDir,
       UserGroupInformation ugi, Credentials ts, boolean recovered)
       throws IOException {
     // Check for safe-mode
@@ -3544,6 +3552,13 @@ public class JobTracker implements MRCon
       throw new IOException(e);
     }
     
+    if (recovered && 
+        !job.getJobConf().getBoolean(
+            JobConf.MAPREDUCE_RECOVER_JOB, 
+            JobConf.DEFAULT_MAPREDUCE_RECOVER_JOB)) {
+      return null;
+    }
+    
     synchronized (this) {
       // check if queue is RUNNING
       String queue = job.getProfile().getQueueName();

Modified: 
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=1464947&r1=1464946&r2=1464947&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
 Fri Apr  5 12:24:53 2013
@@ -202,8 +202,8 @@ public class TestRecoveryManager {
         new Path(TEST_DIR, "output3"), 2, 0, "test-resubmission", signalFile,
         signalFile);
 
-    JobClient jc = new JobClient(job1);
-    RunningJob rJob1 = jc.submitJob(job1);
+    JobClient jc1 = new JobClient(job1);
+    RunningJob rJob1 = jc1.submitJob(job1);
     LOG.info("Submitted first job " + rJob1.getID());
 
     while (rJob1.mapProgress() < 0.5f) {
@@ -211,6 +211,27 @@ public class TestRecoveryManager {
       UtilsForTests.waitFor(100);
     }
 
+    // now submit job2
+    JobConf job2 = mr.createJobConf();
+
+    String signalFile1 = new Path(TEST_DIR, "signal1").toString();
+    UtilsForTests.configureWaitingJobConf(job2, 
+        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output4"), 20, 0, 
+        "test-recovery-manager", signalFile1, signalFile1);
+    job2.setBoolean(JobConf.MAPREDUCE_RECOVER_JOB, false); // don't recover
+    
+    // submit the job
+    RunningJob rJob2 = (new JobClient(job2)).submitJob(job2);
+    LOG.info("Submitted job " + rJob2.getID());
+    
+    // wait for it to init
+    JobInProgress jip2 = jobtracker.getJob(rJob2.getID());
+    
+    while (!jip2.inited()) {
+      LOG.info("Waiting for job " + jip2.getJobID() + " to be inited");
+      UtilsForTests.waitFor(100);
+    }
+
     // kill the jobtracker
     LOG.info("Stopping jobtracker");
     mr.stopJobTracker();
@@ -218,11 +239,11 @@ public class TestRecoveryManager {
     // start the jobtracker
     LOG.info("Starting jobtracker");
     mr.startJobTracker();
-    UtilsForTests.waitForJobTracker(jc);
+    UtilsForTests.waitForJobTracker(jc1);
 
     jobtracker = mr.getJobTrackerRunner().getJobTracker();
 
-    // assert that job is recovered by the jobtracker
+    // assert that only job1 is recovered by the jobtracker
     Assert.assertEquals("Resubmission failed ", 1, 
         jobtracker.getAllJobs().length);
 


Reply via email to