Author: acmurthy
Date: Tue Sep 25 17:23:37 2012
New Revision: 1390006

URL: http://svn.apache.org/viewvc?rev=1390006&view=rev
Log:
Merge -c 1356904 from branch-1 to branch-1.1 to fix MAPREDUCE-3837. Job tracker 
is not able to recover job in case of crash and after that no user can submit 
job.

Modified:
    hadoop/common/branches/branch-1.1/CHANGES.txt
    
hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java
    
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java
    
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java

Modified: hadoop/common/branches/branch-1.1/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/CHANGES.txt?rev=1390006&r1=1390005&r2=1390006&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.1/CHANGES.txt Tue Sep 25 17:23:37 2012
@@ -365,6 +365,9 @@ Release 1.1.0 - 2012.09.16
     MAPREDUCE-4675. Fixed a race condition caused in TestKillSubProcesses 
caused
     due to a recent commit. (Bikas Saha via vinodkv)
 
+    MAPREDUCE-3837. Job tracker is not able to recover job in case of crash
+    and after that no user can submit job. (Mayank Bansal via tomwhite)
+
 Release 1.0.4 - Unreleased
 
   NEW FEATURES

Modified: 
hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1390006&r1=1390005&r2=1390006&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
 (original)
+++ 
hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
 Tue Sep 25 17:23:37 2012
@@ -205,6 +205,7 @@ public class JobTracker implements MRCon
   State state = State.INITIALIZING;
   private static final int FS_ACCESS_RETRY_PERIOD = 10000;
   static final String JOB_INFO_FILE = "job-info";
+  static final String JOB_TOKEN_FILE = "jobToken";
   private DNSToSwitchMapping dnsToSwitchMapping;
   private NetworkTopology clusterMap = new NetworkTopology();
   private int numTaskCacheLevels; // the max level to which we cache tasks
@@ -1215,179 +1216,6 @@ public class JobTracker implements MRCon
     /** A custom listener that replays the events in the order in which the 
      * events (task attempts) occurred. 
      */
-    class JobRecoveryListener implements Listener {
-      // The owner job
-      private JobInProgress jip;
-      
-      private JobHistory.JobInfo job; // current job's info object
-      
-      // Maintain the count of the (attempt) events recovered
-      private int numEventsRecovered = 0;
-      
-      // Maintains open transactions
-      private Map<String, String> hangingAttempts = 
-        new HashMap<String, String>();
-      
-      // Whether there are any updates for this job
-      private boolean hasUpdates = false;
-      
-      public JobRecoveryListener(JobInProgress jip) {
-        this.jip = jip;
-        this.job = new JobHistory.JobInfo(jip.getJobID().toString());
-      }
-
-      /**
-       * Process a task. Note that a task might commit a previously pending 
-       * transaction.
-       */
-      private void processTask(String taskId, JobHistory.Task task) {
-        // Any TASK info commits the previous transaction
-        boolean hasHanging = hangingAttempts.remove(taskId) != null;
-        if (hasHanging) {
-          numEventsRecovered += 2;
-        }
-        
-        TaskID id = TaskID.forName(taskId);
-        TaskInProgress tip = getTip(id);
-
-        updateTip(tip, task);
-      }
-
-      /**
-       * Adds a task-attempt in the listener
-       */
-      private void processTaskAttempt(String taskAttemptId, 
-                                      JobHistory.TaskAttempt attempt) 
-        throws UnknownHostException {
-        TaskAttemptID id = TaskAttemptID.forName(taskAttemptId);
-        
-        // Check if the transaction for this attempt can be committed
-        String taskStatus = attempt.get(Keys.TASK_STATUS);
-        TaskAttemptID taskID = TaskAttemptID.forName(taskAttemptId);
-        JobInProgress jip = getJob(taskID.getJobID());
-        JobStatus prevStatus = (JobStatus)jip.getStatus().clone();
-
-        if (taskStatus.length() > 0) {
-          // This means this is an update event
-          if (taskStatus.equals(Values.SUCCESS.name())) {
-            // Mark this attempt as hanging
-            hangingAttempts.put(id.getTaskID().toString(), taskAttemptId);
-            addSuccessfulAttempt(jip, id, attempt);
-          } else {
-            addUnsuccessfulAttempt(jip, id, attempt);
-            numEventsRecovered += 2;
-          }
-        } else {
-          createTaskAttempt(jip, id, attempt);
-        }
-        
-        JobStatus newStatus = (JobStatus)jip.getStatus().clone();
-        if (prevStatus.getRunState() != newStatus.getRunState()) {
-          if(LOG.isDebugEnabled())
-            LOG.debug("Status changed hence informing prevStatus" +  
prevStatus + " currentStatus "+ newStatus);
-          JobStatusChangeEvent event =
-            new JobStatusChangeEvent(jip, EventType.RUN_STATE_CHANGED,
-                                     prevStatus, newStatus);
-          updateJobInProgressListeners(event);
-        }
-      }
-
-      public void handle(JobHistory.RecordTypes recType, Map<Keys, 
-                         String> values) throws IOException {
-        if (recType == JobHistory.RecordTypes.Job) {
-          // Update the meta-level job information
-          job.handle(values);
-          
-          // Forcefully init the job as we have some updates for it
-          checkAndInit();
-        } else if (recType.equals(JobHistory.RecordTypes.Task)) {
-          String taskId = values.get(Keys.TASKID);
-          
-          // Create a task
-          JobHistory.Task task = new JobHistory.Task();
-          task.handle(values);
-          
-          // Ignore if its a cleanup task
-          if (isCleanup(task)) {
-            return;
-          }
-            
-          // Process the task i.e update the tip state
-          processTask(taskId, task);
-        } else if (recType.equals(JobHistory.RecordTypes.MapAttempt)) {
-          String attemptId = values.get(Keys.TASK_ATTEMPT_ID);
-          
-          // Create a task attempt
-          JobHistory.MapAttempt attempt = new JobHistory.MapAttempt();
-          attempt.handle(values);
-          
-          // Ignore if its a cleanup task
-          if (isCleanup(attempt)) {
-            return;
-          }
-          
-          // Process the attempt i.e update the attempt state via job
-          processTaskAttempt(attemptId, attempt);
-        } else if (recType.equals(JobHistory.RecordTypes.ReduceAttempt)) {
-          String attemptId = values.get(Keys.TASK_ATTEMPT_ID);
-          
-          // Create a task attempt
-          JobHistory.ReduceAttempt attempt = new JobHistory.ReduceAttempt();
-          attempt.handle(values);
-          
-          // Ignore if its a cleanup task
-          if (isCleanup(attempt)) {
-            return;
-          }
-          
-          // Process the attempt i.e update the job state via job
-          processTaskAttempt(attemptId, attempt);
-        }
-      }
-
-      // Check if the task is of type CLEANUP
-      private boolean isCleanup(JobHistory.Task task) {
-        String taskType = task.get(Keys.TASK_TYPE);
-        return Values.CLEANUP.name().equals(taskType);
-      }
-      
-      // Init the job if its ready for init. Also make sure that the scheduler
-      // is updated
-      private void checkAndInit() throws IOException {
-        String jobStatus = this.job.get(Keys.JOB_STATUS);
-        if (Values.PREP.name().equals(jobStatus)) {
-          hasUpdates = true;
-          LOG.info("Calling init from RM for job " + 
jip.getJobID().toString());
-          try {
-            initJob(jip);
-          } catch (Throwable t) {
-            LOG.error("Job initialization failed : \n"
-                + StringUtils.stringifyException(t));
-            jip.status.setFailureInfo("Job Initialization failed: \n"
-                + StringUtils.stringifyException(t));
-            failJob(jip);
-            throw new IOException(t);
-          }
-        }
-      }
-      
-      void close() {
-        if (hasUpdates) {
-          // Apply the final (job-level) updates
-          JobStatusChangeEvent event = updateJob(jip, job);
-          
-          synchronized (JobTracker.this) {
-            // Update the job listeners
-            updateJobInProgressListeners(event);
-          }
-        }
-      }
-      
-      public int getNumEventsRecovered() {
-        return numEventsRecovered;
-      }
-
-    }
     
     public RecoveryManager() {
       jobsToRecover = new TreeSet<JobID>();
@@ -1441,16 +1269,25 @@ public class JobTracker implements MRCon
     // checks if the job dir has the required files
     public void checkAndAddJob(FileStatus status) throws IOException {
       String fileName = status.getPath().getName();
-      if (isJobNameValid(fileName)) {
-        if (JobClient.isJobDirValid(status.getPath(), fs)) {
-          recoveryManager.addJobForRecovery(JobID.forName(fileName));
-          shouldRecover = true; // enable actual recovery if num-files > 1
-        } else {
-          LOG.info("Found an incomplete job directory " + fileName + "." 
-                   + " Deleting it!!");
-          fs.delete(status.getPath(), true);
-        }
+      if (isJobNameValid(fileName) && isJobDirValid(JobID.forName(fileName))) {
+        recoveryManager.addJobForRecovery(JobID.forName(fileName));
+        shouldRecover = true; // enable actual recovery if num-files > 1
+      }
+    }
+    
+    private boolean isJobDirValid(JobID jobId) throws IOException {
+      boolean ret = false;
+      Path jobInfoFile = getSystemFileForJob(jobId);
+      final Path jobTokenFile = getTokenFileForJob(jobId);
+      JobConf job = new JobConf();
+      if (jobTokenFile.getFileSystem(job).exists(jobTokenFile)
+          && jobInfoFile.getFileSystem(job).exists(jobInfoFile)) {
+        ret = true;
+      } else {
+        LOG.warn("Job " + jobId
+            + " does not have valid info/token file so ignoring for recovery");
       }
+      return ret;
     }
     
     private JobStatusChangeEvent updateJob(JobInProgress jip, 
@@ -1713,11 +1550,9 @@ public class JobTracker implements MRCon
         fs.rename(tmpRestartFile, restartFile); // rename .rec to main file
       } else {
         // For the very first time the jobtracker will create a jobtracker.info
-        // file. If the jobtracker has restarted then disable recovery as 
files'
-        // needed for recovery are missing.
-
-        // disable recovery if this is a restart
-        shouldRecover = false;
+        // file.
+        // enable recovery if this is a restart
+        shouldRecover = true;
 
         // write the jobtracker.info file
         try {
@@ -1769,205 +1604,50 @@ public class JobTracker implements MRCon
       fs.rename(tmpRestartFile, restartFile);
     }
 
-                                   // mapred.JobID::forName returns
-    @SuppressWarnings("unchecked") // mapreduce.JobID
     public void recover() {
+      int recovered = 0;
+      long recoveryProcessStartTime = clock.getTime();
       if (!shouldRecover()) {
         // clean up jobs structure
         jobsToRecover.clear();
         return;
       }
 
-      LOG.info("Restart count of the jobtracker : " + restartCount);
-
-      // I. Init the jobs and cache the recovered job history filenames
-      Map<JobID, Path> jobHistoryFilenameMap = new HashMap<JobID, Path>();
-      Iterator<JobID> idIter = jobsToRecover.iterator();
-      JobInProgress job = null;
-      File jobIdFile = null;
-
-      // 0. Cleanup
-      try {
-        JobHistory.JobInfo.deleteConfFiles();
-      } catch (IOException ioe) {
-        LOG.info("Error in cleaning up job history folder", ioe);
-      }
-
-      while (idIter.hasNext()) {
-        JobID id = idIter.next();
-        LOG.info("Trying to recover details of job " + id);
+      LOG.info("Starting the recovery process for " + jobsToRecover.size()
+          + " jobs ...");
+      for (JobID jobId : jobsToRecover) {
+        LOG.info("Submitting job " + jobId);
         try {
-          // 1. Recover job owner and create JIP
-          jobIdFile = 
-            new File(lDirAlloc.getLocalPathToRead(SUBDIR + "/" + id, 
conf).toString());
-
-          String user = null;
-          if (jobIdFile != null && jobIdFile.exists()) {
-            LOG.info("File " + jobIdFile + " exists for job " + id);
-            FileInputStream in = new FileInputStream(jobIdFile);
-            BufferedReader reader = null;
-            try {
-              reader = new BufferedReader(new InputStreamReader(in));
-              user = reader.readLine();
-              LOG.info("Recovered user " + user + " for job " + id);
-            } finally {
-              if (reader != null) {
-                reader.close();
+          Path jobInfoFile = getSystemFileForJob(jobId);
+          final Path jobTokenFile = getTokenFileForJob(jobId);
+          FSDataInputStream in = fs.open(jobInfoFile);
+          final JobInfo token = new JobInfo();
+          token.readFields(in);
+          in.close();
+          final UserGroupInformation ugi = UserGroupInformation
+              .createRemoteUser(token.getUser().toString());
+          ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
+            public JobStatus run() throws IOException, InterruptedException {
+              Credentials ts = null;
+              JobConf job = new JobConf();
+              if (jobTokenFile.getFileSystem(job).exists(jobTokenFile)) {
+                ts = Credentials.readTokenStorageFile(jobTokenFile, job);
               }
-              in.close();
+              return submitJob(JobID.downgrade(token.getJobID()), token
+                  .getJobSubmitDir().toString(), ugi, ts, true);
             }
-          }
-          if (user == null) {
-            throw new RuntimeException("Incomplete job " + id);
-          }
-
-          // Create the job
-          /* THIS PART OF THE CODE IS USELESS. JOB RECOVERY SHOULD BE
-           * BACKPORTED (MAPREDUCE-873)
-           */
-          job = new JobInProgress(JobTracker.this, conf,
-              new JobInfo((org.apache.hadoop.mapreduce.JobID) id,
-                new Text(user), new Path(getStagingAreaDirInternal(user))),
-              restartCount, new Credentials() /*HACK*/);
-
-          // 2. Check if the user has appropriate access
-          // Get the user group info for the job's owner
-          UserGroupInformation ugi =
-            UserGroupInformation.createRemoteUser(job.getJobConf().getUser());
-          LOG.info("Submitting job " + id + " on behalf of user "
-                   + ugi.getShortUserName() + " in groups : "
-                   + StringUtils.arrayToString(ugi.getGroupNames()));
-
-          // check the access
-          try {
-            aclsManager.checkAccess(job, ugi, Operation.SUBMIT_JOB);
-          } catch (Throwable t) {
-            LOG.warn("Access denied for user " + ugi.getShortUserName() 
-                     + " in groups : [" 
-                     + StringUtils.arrayToString(ugi.getGroupNames()) + "]");
-            throw t;
-          }
-
-          // 3. Get the log file and the file path
-          String logFileName = 
-            JobHistory.JobInfo.getJobHistoryFileName(job.getJobConf(), id);
-          if (logFileName != null) {
-            Path jobHistoryFilePath = 
-              JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
-
-            // 4. Recover the history file. This involved
-            //     - deleting file.recover if file exists
-            //     - renaming file.recover to file if file doesnt exist
-            // This makes sure that the (master) file exists
-            JobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(), 
-                                                     jobHistoryFilePath);
-          
-            // 5. Cache the history file name as it costs one dfs access
-            jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
-          } else {
-            LOG.info("No history file found for job " + id);
-            idIter.remove(); // remove from recovery list
-          }
-
-          // 6. Sumbit the job to the jobtracker
-          addJob(id, job);
-        } catch (Throwable t) {
-          LOG.warn("Failed to recover job " + id + " Ignoring the job.", t);
-          idIter.remove();
-          if (jobIdFile != null) {
-            jobIdFile.delete();
-            jobIdFile = null;
-          }
-          if (job != null) {
-            job.fail();
-            job = null;
-          }
-          continue;
-        }
-      }
-
-      long recoveryStartTime = clock.getTime();
-
-      // II. Recover each job
-      idIter = jobsToRecover.iterator();
-      while (idIter.hasNext()) {
-        JobID id = idIter.next();
-        JobInProgress pJob = getJob(id);
-
-        // 1. Get the required info
-        // Get the recovered history file
-        Path jobHistoryFilePath = jobHistoryFilenameMap.get(pJob.getJobID());
-        String logFileName = jobHistoryFilePath.getName();
-
-        FileSystem fs;
-        try {
-          fs = jobHistoryFilePath.getFileSystem(conf);
-        } catch (IOException ioe) {
-          LOG.warn("Failed to get the filesystem for job " + id + ". 
Ignoring.",
-                   ioe);
-          continue;
-        }
-
-        // 2. Parse the history file
-        // Note that this also involves job update
-        JobRecoveryListener listener = new JobRecoveryListener(pJob);
-        try {
-          JobHistory.parseHistoryFromFS(jobHistoryFilePath.toString(), 
-                                        listener, fs);
-        } catch (Throwable t) {
-          LOG.info("Error reading history file of job " + pJob.getJobID() 
-                   + ". Ignoring the error and continuing.", t);
-        }
-
-        // 3. Close the listener
-        listener.close();
-        
-        // 4. Update the recovery metric
-        totalEventsRecovered += listener.getNumEventsRecovered();
-
-        // 5. Cleanup history
-        // Delete the master log file as an indication that the new file
-        // should be used in future
-        try {
-          synchronized (pJob) {
-            JobHistory.JobInfo.checkpointRecovery(logFileName, 
-                                                  pJob.getJobConf());
-          }
-        } catch (Throwable t) {
-          LOG.warn("Failed to delete log file (" + logFileName + ") for job " 
-                   + id + ". Continuing.", t);
-        }
-
-        if (pJob.isComplete()) {
-          idIter.remove(); // no need to keep this job info as its successful
+          });
+          recovered++;
+        } catch (Exception e) {
+          LOG.warn("Could not recover job " + jobId, e);
         }
       }
-
-      recoveryDuration = clock.getTime() - recoveryStartTime;
+      recoveryDuration = clock.getTime() - recoveryProcessStartTime;
       hasRecovered = true;
 
-      // III. Finalize the recovery
-      synchronized (trackerExpiryQueue) {
-        // Make sure that the tracker statuses in the expiry-tracker queue
-        // are updated
-        long now = clock.getTime();
-        int size = trackerExpiryQueue.size();
-        for (int i = 0; i < size ; ++i) {
-          // Get the first tasktracker
-          TaskTrackerStatus taskTracker = trackerExpiryQueue.first();
-
-          // Remove it
-          trackerExpiryQueue.remove(taskTracker);
-
-          // Set the new time
-          taskTracker.setLastSeen(now);
-
-          // Add back to get the sorted list
-          trackerExpiryQueue.add(taskTracker);
-        }
-      }
-
-      LOG.info("Restoration complete");
+      LOG.info("Recovery done! Recoverd " + recovered + " of "
+          + jobsToRecover.size() + " jobs.");
+      LOG.info("Recovery Duration (ms):" + recoveryDuration);
     }
     
     int totalEventsRecovered() {
@@ -3917,7 +3597,7 @@ public class JobTracker implements MRCon
   public synchronized JobID getNewJobId() throws IOException {
     return new JobID(getTrackerIdentifier(), nextJobId++);
   }
-
+  
   /**
    * JobTracker.submitJob() kicks off a new job.  
    *
@@ -3928,8 +3608,24 @@ public class JobTracker implements MRCon
    */
   public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
       throws IOException {
+    return submitJob(jobId, jobSubmitDir, null, ts, false);
+  }
+
+  /**
+   * JobTracker.submitJob() kicks off a new job.
+   * 
+   * Create a 'JobInProgress' object, which contains both JobProfile and
+   * JobStatus. Those two sub-objects are sometimes shipped outside of the
+   * JobTracker. But JobInProgress adds info that's useful for the JobTracker
+   * alone.
+   */
+  public JobStatus submitJob(JobID jobId, String jobSubmitDir,
+      UserGroupInformation ugi, Credentials ts, boolean recovered)
+      throws IOException {
     JobInfo jobInfo = null;
-    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    if (ugi == null) {
+      ugi = UserGroupInformation.getCurrentUser();
+    }
     synchronized (this) {
       if (jobs.containsKey(jobId)) {
         // job already running, don't start twice
@@ -3970,10 +3666,7 @@ public class JobTracker implements MRCon
       } catch (IOException ioe) {
         throw ioe;
       }
-      boolean recovered = true; // TODO: Once the Job recovery code is there,
-      // (MAPREDUCE-873) we
-      // must pass the "recovered" flag accurately.
-      // This is handled in the trunk/0.22
+
       if (!recovered) {
         // Store the information in a file so that the job can be recovered
         // later (if at all)
@@ -4002,7 +3695,6 @@ public class JobTracker implements MRCon
         failJob(job);
         throw ioe;
       }
-      
       return status;
     }
   }
@@ -4656,6 +4348,11 @@ public class JobTracker implements MRCon
     return new Path(getSystemDirectoryForJob(id)+"/" + JOB_INFO_FILE);
   }
 
+  //Get the job token file in system directory
+  Path getTokenFileForJob(JobID id) {
+    return new Path(getSystemDirectoryForJob(id)+"/" + JOB_TOKEN_FILE);
+  }
+  
   /**
    * Change the run-time priority of the given job.
    * 

Modified: 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java?rev=1390006&r1=1390005&r2=1390006&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java
 (original)
+++ 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java
 Tue Sep 25 17:23:37 2012
@@ -30,9 +30,7 @@ import org.junit.*;
  * This test checks if the jobtracker can detect and recover a tracker that was
  * lost while the jobtracker was down.
  */
-/**UNTIL MAPREDUCE-873 is backported, we will not run recovery manager tests
- */
-@Ignore
+
 public class TestJobTrackerRestartWithLostTracker extends TestCase {
   final Path testDir = new Path("/jt-restart-lost-tt-testing");
   final Path inDir = new Path(testDir, "input");
@@ -53,11 +51,14 @@ public class TestJobTrackerRestartWithLo
   throws IOException {
     FileSystem fileSys = dfs.getFileSystem();
     JobConf jobConf = mr.createJobConf();
-    int numMaps = 50;
+    int numMaps = 2;
     int numReds = 1;
     String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
     String redSignalFile = UtilsForTests.getReduceSignalFile(shareDir);
-    
+
+    // Enable recovery on restart
+    mr.getJobTrackerConf()
+        .setBoolean("mapred.jobtracker.restart.recover", true);
     // Configure the jobs
     JobConf job = configureJob(jobConf, numMaps, numReds, 
                                mapSignalFile, redSignalFile);
@@ -84,10 +85,6 @@ public class TestJobTrackerRestartWithLo
     // Signal the maps to complete
     UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, 
redSignalFile);
     
-    // Enable recovery on restart
-    mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", 
-                                      true);
-    
     // Kill the 2nd tasktracker
     mr.stopTaskTracker(1);
     
@@ -102,6 +99,8 @@ public class TestJobTrackerRestartWithLo
     // Wait for the JT to be ready
     UtilsForTests.waitForJobTracker(jobClient);
 
+    // Signal the maps to complete
+    UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, 
redSignalFile);
     // Signal the reducers to complete
     UtilsForTests.signalTasks(dfs, fileSys, false, mapSignalFile, 
                               redSignalFile);
@@ -113,9 +112,7 @@ public class TestJobTrackerRestartWithLo
                  + "upon restart", 
                  jobClient.getClusterStatus().getTaskTrackers(), 1);
 
-    // validate the history file
-    TestJobHistory.validateJobHistoryFileFormat(id, job, "SUCCESS", true);
-    TestJobHistory.validateJobHistoryFileContent(mr, rJob, job);
+    assertTrue("Job should be successful", rJob.isSuccessful());
   }
   
   public void testRestartWithLostTracker() throws IOException {

Modified: 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java?rev=1390006&r1=1390005&r2=1390006&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java
 (original)
+++ 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java
 Tue Sep 25 17:23:37 2012
@@ -34,9 +34,7 @@ import org.junit.*;
  * restart doesnt schedule any new tasks and waits for the (old) trackers to 
  * join back.
  */
-/**UNTIL MAPREDUCE-873 is backported, we will not run recovery manager tests
- */
-@Ignore
+
 public class TestJobTrackerSafeMode extends TestCase {
   final Path testDir = 
     new Path(System.getProperty("test.build.data", "/tmp"), "jt-safemode");
@@ -153,6 +151,7 @@ public class TestJobTrackerSafeMode exte
     mr.getTaskTrackerRunner(trackerToKill).getTaskTracker().shutdown();
     mr.stopTaskTracker(trackerToKill);
 
+    LOG.info("Starting the jobtracker...");
     // Restart the jobtracker
     mr.startJobTracker();
 
@@ -169,8 +168,6 @@ public class TestJobTrackerSafeMode exte
     LOG.info("Start a new tracker");
     mr.startTaskTracker(null, null, ++numTracker, numDir);
 
-    // Check if the jobs are still running
-    
     // Wait for the tracker to be lost
     boolean shouldSchedule = jobtracker.recoveryManager.shouldSchedule();
     while (!checkTrackers(jobtracker, trackers, lostTrackers)) {
@@ -181,20 +178,55 @@ public class TestJobTrackerSafeMode exte
       // snapshot jobtracker's scheduling status
       shouldSchedule = jobtracker.recoveryManager.shouldSchedule();
     }
-
-    assertTrue("JobTracker hasnt opened up scheduling even all the" 
-               + " trackers were recovered", 
-               jobtracker.recoveryManager.shouldSchedule());
-    
-    assertEquals("Recovery manager is in inconsistent state", 
-                 0, jobtracker.recoveryManager.recoveredTrackers.size());
     
+    assertTrue("JobTracker has not opened up scheduling after all the"
+        + " trackers were recovered", shouldSchedule);
+
+    assertEquals("Recovery manager is in inconsistent state", 0,
+        jobtracker.recoveryManager.recoveredTrackers.size());
+
+    // Signal the maps to complete
+    UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, 
redSignalFile);
+
+    // Signal the reducers to complete
+    UtilsForTests
+        .signalTasks(dfs, fileSys, false, mapSignalFile, redSignalFile);
     // wait for the job to be complete
     UtilsForTests.waitTillDone(jobClient);
   }
 
   private boolean checkTrackers(JobTracker jobtracker, Set<String> present, 
                                 Set<String> absent) {
+    while (jobtracker.getClusterStatus(true).getActiveTrackerNames().size() != 
2) {
+      LOG.info("Waiting for Initialize all Task Trackers");
+      UtilsForTests.waitFor(1000);
+    }
+    // Checking if the task tracker been initiated again
+    boolean found = false;
+    String strNewTrackerName = (String) (present.toArray()[0]);
+    LOG.info("Number of Trackers: "
+        + jobtracker.getClusterStatus(true).getActiveTrackerNames().size());
+    for (String trackername : jobtracker.getClusterStatus(true)
+        .getActiveTrackerNames()) {
+      if (trackername.equalsIgnoreCase((String) (present.toArray()[0]))) {
+        found = true;
+      } else {
+        String[] trackerhostnames = trackername.split(":");
+        CharSequence cseq = new String(trackerhostnames[0]);
+        if (((String) (present.toArray()[0])).contains(cseq)) {
+          strNewTrackerName = trackername;
+          found = false;
+          break;
+        }
+      }
+    }
+    if (!found) {
+      present.remove(((String) (present.toArray()[0])));
+      LOG.info("Old tracker on this machine got reinited, "
+          + "Tracker added with new port " + strNewTrackerName);
+      present.add(strNewTrackerName);
+    }
+    
     long jobtrackerRecoveryFinishTime = 
       jobtracker.getStartTime() + jobtracker.getRecoveryDuration();
     for (String trackerName : present) {

Modified: 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=1390006&r1=1390005&r2=1390006&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
 (original)
+++ 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
 Tue Sep 25 17:23:37 2012
@@ -41,15 +41,37 @@ import org.junit.*;
  * failures and the jobtracker is able to tolerate {@link RecoveryManager}
  * failure.
  */
-/**UNTIL MAPREDUCE-873 is backported, we will not run recovery manager tests
- */
-@Ignore
+
 public class TestRecoveryManager extends TestCase {
   private static final Log LOG = 
     LogFactory.getLog(TestRecoveryManager.class);
   private static final Path TEST_DIR = 
     new Path(System.getProperty("test.build.data", "/tmp"), 
              "test-recovery-manager");
+  private FileSystem fs;
+  private JobConf conf;
+  private MiniMRCluster mr;
+
+  protected void setUp() {
+    JobConf conf = new JobConf();
+    try {
+      fs = FileSystem.get(new Configuration());
+      fs.delete(TEST_DIR, true);
+      conf.set("mapred.jobtracker.job.history.block.size", "1024");
+      conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
+      mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  protected void tearDown() {
+    ClusterStatus status = mr.getJobTrackerRunner().getJobTracker()
+        .getClusterStatus(false);
+    if (status.getJobTrackerState() == JobTracker.State.RUNNING) {
+      mr.shutdown();
+    }
+  }
   
   /**
    * Tests the {@link JobTracker} against the exceptions thrown in 
@@ -60,21 +82,12 @@ public class TestRecoveryManager extends
    *  - restarts the jobtracker
    *  - checks if the jobtraker starts normally
    */
-  public void testJobTracker() throws Exception {
+  public void testJobTrackerRestartsWithMissingJobFile() throws Exception {
     LOG.info("Testing jobtracker restart with faulty job");
     String signalFile = new Path(TEST_DIR, "signal").toString();
-    JobConf conf = new JobConf();
-    
-    FileSystem fs = FileSystem.get(new Configuration());
-    fs.delete(TEST_DIR, true); // cleanup
-    
-    conf.set("mapred.jobtracker.job.history.block.size", "1024");
-    conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
-    
-    MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
-    
+
     JobConf job1 = mr.createJobConf();
-    
+
     UtilsForTests.configureWaitingJobConf(job1, 
         new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output1"), 2, 0, 
         "test-recovery-manager", signalFile, signalFile);
@@ -131,11 +144,63 @@ public class TestRecoveryManager extends
     // check if the jobtracker came up or not
     assertEquals("JobTracker crashed!", 
                  JobTracker.State.RUNNING, status.getJobTrackerState());
-    
-    mr.shutdown();
   }
   
   /**
+   * Tests the re-submission of the job in case of jobtracker died/restart  
+   *  - submits a job and let it be inited.
+   *  - kills the jobtracker
+   *  - checks if the jobtraker starts normally and job is recovered while 
+   */
+
+  public void testJobResubmission() throws Exception {
+    LOG.info("Testing Job Resubmission");
+    String signalFile = new Path(TEST_DIR, "signal").toString();
+
+    // make sure that the jobtracker is in recovery mode
+    mr.getJobTrackerConf()
+        .setBoolean("mapred.jobtracker.restart.recover", true);
+
+    JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
+
+    JobConf job1 = mr.createJobConf();
+    UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"),
+        new Path(TEST_DIR, "output3"), 2, 0, "test-resubmission", signalFile,
+        signalFile);
+
+    JobClient jc = new JobClient(job1);
+    RunningJob rJob1 = jc.submitJob(job1);
+    LOG.info("Submitted first job " + rJob1.getID());
+
+    while (rJob1.mapProgress() < 0.5f) {
+      LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done");
+      UtilsForTests.waitFor(100);
+    }
+
+    // kill the jobtracker
+    LOG.info("Stopping jobtracker");
+    mr.stopJobTracker();
+
+    // start the jobtracker
+    LOG.info("Starting jobtracker");
+    mr.startJobTracker();
+    UtilsForTests.waitForJobTracker(jc);
+
+    jobtracker = mr.getJobTrackerRunner().getJobTracker();
+
+    // assert that job is recovered by the jobtracker
+    assertEquals("Resubmission failed ", 1, jobtracker.getAllJobs().length);
+    JobInProgress jip = jobtracker.getJob(rJob1.getID());
+    while (!jip.isComplete()) {
+      LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
+      // Signaling Map task to complete
+      fs.create(new Path(TEST_DIR, "signal"));
+      UtilsForTests.waitFor(100);
+    }
+    assertTrue("Task should be successful", rJob1.isSuccessful());
+  }
+
+  /**
    * Tests the {@link JobTracker.RecoveryManager} against the exceptions 
thrown 
    * during recovery. It does the following :
    *  - submits a job with HIGH priority and x tasks
@@ -147,19 +212,13 @@ public class TestRecoveryManager extends
    *  - checks if the jobtraker starts normally and job#2 is recovered while 
    *    job#1 is failed.
    */
-  public void testRecoveryManager() throws Exception {
+  public void testJobTrackerRestartWithBadJobs() throws Exception {
     LOG.info("Testing recovery-manager");
     String signalFile = new Path(TEST_DIR, "signal").toString();
+    // make sure that the jobtracker is in recovery mode
+    mr.getJobTrackerConf()
+        .setBoolean("mapred.jobtracker.restart.recover", true);
     
-    // clean up
-    FileSystem fs = FileSystem.get(new Configuration());
-    fs.delete(TEST_DIR, true);
-    
-    JobConf conf = new JobConf();
-    conf.set("mapred.jobtracker.job.history.block.size", "1024");
-    conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
-    
-    MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
     JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
     
     JobConf job1 = mr.createJobConf();
@@ -179,7 +238,7 @@ public class TestRecoveryManager extends
       LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done");
       UtilsForTests.waitFor(100);
     }
-    
+
     // now submit job2
     JobConf job2 = mr.createJobConf();
 
@@ -224,7 +283,7 @@ public class TestRecoveryManager extends
       LOG.info("Waiting for job " + jip.getJobID() + " to be inited");
       UtilsForTests.waitFor(100);
     }
-
+    
     // kill the jobtracker
     LOG.info("Stopping jobtracker");
     mr.stopJobTracker();
@@ -247,21 +306,18 @@ public class TestRecoveryManager extends
     jobtracker = mr.getJobTrackerRunner().getJobTracker();
     
     // assert that job2 is recovered by the jobtracker as job1 would fail
-    assertEquals("Recovery manager failed to tolerate job failures",
-                 2, jobtracker.getAllJobs().length);
+    assertEquals("Recovery manager failed to tolerate job failures", 1,
+        jobtracker.getAllJobs().length);
     
     // check if the job#1 has failed
     JobStatus status = jobtracker.getJobStatus(rJob1.getID());
-    assertEquals("Faulty job not failed", 
-                 JobStatus.FAILED, status.getRunState());
+    assertNull("Faulty job should not be resubmitted", status);
     
     jip = jobtracker.getJob(rJob2.getID());
     assertFalse("Job should be running", jip.isComplete());
     
     status = jobtracker.getJobStatus(rJob3.getID());
-    assertNull("Job should be missing", status);
-    
-    mr.shutdown();
+    assertNull("Job should be missing because of ACL changed", status);
   }
   
   /**
@@ -278,113 +334,76 @@ public class TestRecoveryManager extends
    *     jobtracker should crash.
    */
   public void testRestartCount() throws Exception {
-    LOG.info("Testing restart-count");
+    LOG.info("Testing Job Restart Count");
     String signalFile = new Path(TEST_DIR, "signal").toString();
-    
-    // clean up
-    FileSystem fs = FileSystem.get(new Configuration());
-    fs.delete(TEST_DIR, true);
-    
-    JobConf conf = new JobConf();
-    conf.set("mapred.jobtracker.job.history.block.size", "1024");
-    conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
-    conf.setBoolean("mapred.jobtracker.restart.recover", true);
-    // since there is no need for initing
-    conf.setClass("mapred.jobtracker.taskScheduler", MyScheduler.class,
-                  TaskScheduler.class);
-    
-    MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
-    JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
-    JobClient jc = new JobClient(mr.createJobConf());
+    // make sure that the jobtracker is in recovery mode
+    mr.getJobTrackerConf()
+        .setBoolean("mapred.jobtracker.restart.recover", true);
 
-    // check if the jobtracker info file exists
-    Path infoFile = jobtracker.recoveryManager.getRestartCountFile();
-    assertTrue("Jobtracker infomation is missing", fs.exists(infoFile));
+    JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
 
-    // check if garbling the system files disables the recovery process
-    LOG.info("Stopping jobtracker for testing with system files deleted");
-    mr.stopJobTracker();
-    
-    // delete the info file
-    Path rFile = jobtracker.recoveryManager.getRestartCountFile();
-    fs.delete(rFile,false);
-    
-    // start the jobtracker
-    LOG.info("Starting jobtracker with system files deleted");
-    mr.startJobTracker();
-    
-    UtilsForTests.waitForJobTracker(jc);
-    jobtracker = mr.getJobTrackerRunner().getJobTracker();
+    JobConf job1 = mr.createJobConf();
+    // set the high priority
+    job1.setJobPriority(JobPriority.HIGH);
 
-    // check if the recovey is disabled
-    assertFalse("Recovery is not disabled upon missing system files", 
-                jobtracker.recoveryManager.shouldRecover());
-
-    // check if the system dir is sane
-    assertTrue("Recovery file is missing upon restart", fs.exists(rFile));
-    Path tFile = jobtracker.recoveryManager.getTempRestartCountFile();
-    assertFalse("Temp recovery file exists upon restart", fs.exists(tFile));
+    UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"),
+        new Path(TEST_DIR, "output3"), 30, 0, "test-restart", signalFile,
+        signalFile);
 
-    // submit a job
-    JobConf job = mr.createJobConf();
-    
-    UtilsForTests.configureWaitingJobConf(job, 
-        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output6"), 2, 0, 
-        "test-recovery-manager", signalFile, signalFile);
-    
     // submit the faulty job
-    RunningJob rJob = jc.submitJob(job);
-    LOG.info("Submitted first job " + rJob.getID());
+    JobClient jc = new JobClient(job1);
+    RunningJob rJob1 = jc.submitJob(job1);
+    LOG.info("Submitted first job " + rJob1.getID());
+
+    JobInProgress jip = jobtracker.getJob(rJob1.getID());
 
-    // wait for 1 min
-    UtilsForTests.waitFor(60000);
+    while (!jip.inited()) {
+      LOG.info("Waiting for job " + jip.getJobID() + " to be inited");
+      UtilsForTests.waitFor(100);
+    }
 
-    // kill the jobtracker multiple times and check if the count is correct
-    for (int i = 1; i <= 5; ++i) {
+    for (int i = 1; i <= 2; ++i) {
       LOG.info("Stopping jobtracker for " + i + " time");
       mr.stopJobTracker();
-      
+
       // start the jobtracker
       LOG.info("Starting jobtracker for " + i + " time");
       mr.startJobTracker();
-      
+
       UtilsForTests.waitForJobTracker(jc);
-      
-      // check if the system dir is sane
-      assertTrue("Recovery file is missing upon restart", fs.exists(rFile));
-      assertFalse("Temp recovery file exists upon restart", fs.exists(tFile));
-      
+
       jobtracker = mr.getJobTrackerRunner().getJobTracker();
-      JobInProgress jip = jobtracker.getJob(rJob.getID());
-      
+
       // assert if restart count is correct
-      assertEquals("Recovery manager failed to recover restart count",
-                   i, jip.getNumRestarts());
+      // It should always be 0 now as its resubmit everytime then restart.
+      assertEquals("Recovery manager failed to recover restart count", 0, jip
+          .getNumRestarts());
     }
-    
+
     // kill the old job
-    rJob.killJob();
+    rJob1.killJob();
 
     // II. Submit a new job and check if the restart count is 0
-    JobConf job1 = mr.createJobConf();
-    
-    UtilsForTests.configureWaitingJobConf(job1, 
-        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output7"), 50, 0, 
-        "test-recovery-manager", signalFile, signalFile);
+    JobConf job2 = mr.createJobConf();
+
+    UtilsForTests.configureWaitingJobConf(job2, new Path(TEST_DIR, "input"),
+        new Path(TEST_DIR, "output7"), 50, 0, "test-restart-manager",
+        signalFile, signalFile);
 
     // submit a new job
-    rJob = jc.submitJob(job1);
-    LOG.info("Submitted first job after restart" + rJob.getID());
+    RunningJob rJob2 = jc.submitJob(job2);
+    LOG.info("Submitted first job after restart" + rJob2.getID());
 
     // assert if restart count is correct
-    JobInProgress jip = jobtracker.getJob(rJob.getID());
-    assertEquals("Restart count for new job is incorrect",
-                 0, jip.getNumRestarts());
+    jip = jobtracker.getJob(rJob2.getID());
+    assertEquals("Restart count for new job is incorrect", 0, jip
+        .getNumRestarts());
 
     LOG.info("Stopping jobtracker for testing the fs errors");
     mr.stopJobTracker();
 
     // check if system.dir problems in recovery kills the jobtracker
+    Path rFile = jobtracker.recoveryManager.getRestartCountFile();
     fs.delete(rFile, false);
     FSDataOutputStream out = fs.create(rFile);
     out.writeBoolean(true);
@@ -396,8 +415,7 @@ public class TestRecoveryManager extends
     JobTrackerRunner runner = mr.getJobTrackerRunner();
     assertFalse("JobTracker is still alive", runner.isActive());
 
-    mr.shutdown();
-  } 
+  }
 
   /**
    * Test if the jobtracker waits for the info file to be created before 


Reply via email to