Author: acmurthy
Date: Thu Apr  4 13:10:00 2013
New Revision: 1464514

URL: http://svn.apache.org/r1464514
Log:
MAPREDUCE-4463. Fix job recovery failures which were caused by HDFS permission 
issues since the token file was being read as the user who submitted the job 
rather than JobTracker user. Contributed by Tom White & Arun C Murthy.

Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1464514&r1=1464513&r2=1464514&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Thu Apr  4 13:10:00 2013
@@ -568,6 +568,11 @@ Release 1.2.0 - unreleased
     HADOOP-7101. UserGroupInformation.getCurrentUser() fails when called from
     non-Hadoop JAAS context. (todd, backported by suresh)
 
+    MAPREDUCE-4463. Fix job recovery failures which were caused by HDFS
+    permission issues since the token file was being read as the user who
+    submitted the job rather than JobTracker user. (tomwhite, acmurthy via
+    acmurthy) 
+
 Release 1.1.2 - 2013.01.30
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1464514&r1=1464513&r2=1464514&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
 Thu Apr  4 13:10:00 2013
@@ -1418,15 +1418,18 @@ public class JobTracker implements MRCon
           final JobInfo token = new JobInfo();
           token.readFields(in);
           in.close();
+          
+          // Read tokens as JT user
+          JobConf job = new JobConf();
+          final Credentials ts = 
+              (jobTokenFile.getFileSystem(job).exists(jobTokenFile)) ?
+            Credentials.readTokenStorageFile(jobTokenFile, job) : null;
+
+          // Re-submit job  
           final UserGroupInformation ugi = UserGroupInformation
               .createRemoteUser(token.getUser().toString());
           ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
             public JobStatus run() throws IOException, InterruptedException {
-              Credentials ts = null;
-              JobConf job = new JobConf();
-              if (jobTokenFile.getFileSystem(job).exists(jobTokenFile)) {
-                ts = Credentials.readTokenStorageFile(jobTokenFile, job);
-              }
               return submitJob(JobID.downgrade(token.getJobID()), token
                   .getJobSubmitDir().toString(), ugi, ts, true);
             }

Modified: 
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=1464514&r1=1464513&r2=1464514&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
 Thu Apr  4 13:10:00 2013
@@ -20,9 +20,7 @@ package org.apache.hadoop.mapred;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
-
 import java.util.concurrent.CountDownLatch;
-import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,12 +29,16 @@ import org.apache.hadoop.examples.SleepJ
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobTracker.RecoveryManager;
 import org.apache.hadoop.mapred.MiniMRCluster.JobTrackerRunner;
 import org.apache.hadoop.mapred.QueueManager.QueueACL;
-import org.apache.hadoop.mapred.TestJobInProgressListener.MyScheduler;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.junit.*;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 
 /**
  * Test whether the {@link RecoveryManager} is able to tolerate job-recovery 
@@ -51,7 +53,7 @@ public class TestRecoveryManager {
     new Path(System.getProperty("test.build.data", "/tmp"), 
              "test-recovery-manager");
   private FileSystem fs;
-  private JobConf conf;
+  private MiniDFSCluster dfs;
   private MiniMRCluster mr;
 
   @Before
@@ -70,11 +72,24 @@ public class TestRecoveryManager {
 
   @After
   public void tearDown() {
-    if (mr != null) {
-      ClusterStatus status = mr.getJobTrackerRunner().getJobTracker()
-          .getClusterStatus(false);
-      if (status.getJobTrackerState() == JobTracker.State.RUNNING) {
-        mr.shutdown();
+    try {
+      if (mr != null) {
+        ClusterStatus status = mr.getJobTrackerRunner().getJobTracker()
+            .getClusterStatus(false);
+        if (status.getJobTrackerState() == JobTracker.State.RUNNING) {
+          mr.shutdown();
+        }
+      }
+    } finally {
+      mr = null;
+      
+      try {
+        if (dfs != null) {
+          dfs.shutdown();
+          dfs = null;
+        }
+      } finally {
+        dfs = null;
       }
     }
   }
@@ -526,7 +541,7 @@ public class TestRecoveryManager {
   @Test(timeout=120000)
   public void testJobTrackerInfoCreation() throws Exception {
     LOG.info("Testing jobtracker.info file");
-    MiniDFSCluster dfs = new MiniDFSCluster(new Configuration(), 1, true, 
null);
+    dfs = new MiniDFSCluster(new Configuration(), 1, true, null);
     String namenode = (dfs.getFileSystem()).getUri().getHost() + ":"
                       + (dfs.getFileSystem()).getUri().getPort();
     // shut down the data nodes
@@ -575,4 +590,91 @@ public class TestRecoveryManager {
     Assert.assertFalse("JobTracker failed to create info files with 
datanodes!", 
         failed);
   }
+  
+  static void mkdirWithPerms(FileSystem fs, String dir, short mode) throws 
IOException {
+    Path p = new Path(dir);
+    fs.mkdirs(p);
+    fs.setPermission(p, new FsPermission(mode));
+  }
+  
+  @Test(timeout=120000)
+  public void testJobResubmissionAsDifferentUser() throws Exception {
+    LOG.info("Testing Job Resubmission as a different user to the jobtracker");
+    
+    final Path HDFS_TEST_DIR = new Path("/tmp");
+    
+    JobConf conf = new JobConf();
+
+    dfs = new MiniDFSCluster(conf, 1, true, null);
+    fs = dfs.getFileSystem();
+
+    conf.set("mapreduce.jobtracker.staging.root.dir", "/user");
+    conf.set("mapred.system.dir", "/mapred");
+    String mapredSysDir =  conf.get("mapred.system.dir");
+    mkdirWithPerms(fs, mapredSysDir, (short)0700);
+    fs.setOwner(new Path(mapredSysDir),
+        UserGroupInformation.getCurrentUser().getUserName(), "mrgroup");
+
+    mkdirWithPerms(fs, "/user", (short)0777);
+    mkdirWithPerms(fs, "/mapred", (short)0777);
+    mkdirWithPerms(fs, "/tmp", (short)0777);
+
+    mr = 
+        new MiniMRCluster(
+            1, dfs.getFileSystem().getUri().toString(), 1, null, null, conf);
+
+    String signalFile = new Path(HDFS_TEST_DIR, "signal").toString();
+
+    // make sure that the jobtracker is in recovery mode
+    mr.getJobTrackerConf()
+    .setBoolean("mapred.jobtracker.restart.recover", true);
+
+    JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
+
+    final JobConf job1 = mr.createJobConf();
+    UtilsForTests.configureWaitingJobConf(job1, new Path(HDFS_TEST_DIR, 
"input"),
+        new Path(HDFS_TEST_DIR, "output3"), 2, 0, "test-resubmission", 
signalFile,
+        signalFile);
+
+    UserGroupInformation ugi =
+        UserGroupInformation.createUserForTesting("bob", new 
String[]{"users"});
+    job1.setUser(ugi.getUserName());
+
+    JobClient jc = new JobClient(job1);
+    RunningJob rJob1 = ugi.doAs(new PrivilegedExceptionAction<RunningJob>() {
+      public RunningJob run() throws IOException {
+        JobClient jc = new JobClient(job1);
+        return jc.submitJob(job1);
+      }
+    });
+    LOG.info("Submitted first job " + rJob1.getID());
+
+    while (rJob1.mapProgress() < 0.5f) {
+      LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done");
+      UtilsForTests.waitFor(100);
+    }
+
+    // kill the jobtracker
+    LOG.info("Stopping jobtracker");
+    mr.stopJobTracker();
+
+    // start the jobtracker
+    LOG.info("Starting jobtracker");
+    mr.startJobTracker();
+    UtilsForTests.waitForJobTracker(jc);
+
+    jobtracker = mr.getJobTrackerRunner().getJobTracker();
+    // assert that job is recovered by the jobtracker
+    Assert.assertEquals("Resubmission failed ", 1, 
jobtracker.getAllJobs().length);
+    JobInProgress jip = jobtracker.getJob(rJob1.getID());
+
+    // Signaling Map task to complete
+    fs.create(new Path(HDFS_TEST_DIR, "signal"));
+    while (!jip.isComplete()) {
+      LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
+      UtilsForTests.waitFor(100);
+    }
+    rJob1 = jc.getJob(rJob1.getID());
+    Assert.assertTrue("Task should be successful", rJob1.isSuccessful());
+  }
 }


Reply via email to