Author: acmurthy Date: Thu Apr 4 13:10:00 2013 New Revision: 1464514 URL: http://svn.apache.org/r1464514 Log: MAPREDUCE-4463. Fix job recovery failures which were caused by HDFS permission issues since the token file was being read as the user who submitted the job rather than JobTracker user. Contributed by Tom White & Arun C Murthy.
Modified: hadoop/common/branches/branch-1/CHANGES.txt hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java Modified: hadoop/common/branches/branch-1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1464514&r1=1464513&r2=1464514&view=diff ============================================================================== --- hadoop/common/branches/branch-1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1/CHANGES.txt Thu Apr 4 13:10:00 2013 @@ -568,6 +568,11 @@ Release 1.2.0 - unreleased HADOOP-7101. UserGroupInformation.getCurrentUser() fails when called from non-Hadoop JAAS context. (todd, backported by suresh) + MAPREDUCE-4463. Fix job recovery failures which were caused by HDFS + permission issues since the token file was being read as the user who + submitted the job rather than JobTracker user. (tomwhite, acmurthy via + acmurthy) + Release 1.1.2 - 2013.01.30 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1464514&r1=1464513&r2=1464514&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java Thu Apr 4 13:10:00 2013 @@ -1418,15 +1418,18 @@ public class JobTracker implements MRCon final JobInfo token = new JobInfo(); token.readFields(in); in.close(); + + // Read tokens as JT user + JobConf job = new JobConf(); + final Credentials ts = + (jobTokenFile.getFileSystem(job).exists(jobTokenFile)) ? + Credentials.readTokenStorageFile(jobTokenFile, job) : null; + + // Re-submit job final UserGroupInformation ugi = UserGroupInformation .createRemoteUser(token.getUser().toString()); ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException { - Credentials ts = null; - JobConf job = new JobConf(); - if (jobTokenFile.getFileSystem(job).exists(jobTokenFile)) { - ts = Credentials.readTokenStorageFile(jobTokenFile, job); - } return submitJob(JobID.downgrade(token.getJobID()), token .getJobSubmitDir().toString(), ugi, ts, true); } Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=1464514&r1=1464513&r2=1464514&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java (original) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java Thu Apr 4 13:10:00 2013 @@ -20,9 +20,7 @@ package org.apache.hadoop.mapred; import java.io.IOException; import java.security.PrivilegedExceptionAction; - import java.util.concurrent.CountDownLatch; -import junit.framework.TestCase; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,12 +29,16 @@ import org.apache.hadoop.examples.SleepJ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapred.JobTracker.RecoveryManager; import org.apache.hadoop.mapred.MiniMRCluster.JobTrackerRunner; import org.apache.hadoop.mapred.QueueManager.QueueACL; -import org.apache.hadoop.mapred.TestJobInProgressListener.MyScheduler; import org.apache.hadoop.security.UserGroupInformation; -import org.junit.*; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; /** * Test whether the {@link RecoveryManager} is able to tolerate job-recovery @@ -51,7 +53,7 @@ public class TestRecoveryManager { new Path(System.getProperty("test.build.data", "/tmp"), "test-recovery-manager"); private FileSystem fs; - private JobConf conf; + private MiniDFSCluster dfs; private MiniMRCluster mr; @Before @@ -70,11 +72,24 @@ public class TestRecoveryManager { @After public void tearDown() { - if (mr != null) { - ClusterStatus status = mr.getJobTrackerRunner().getJobTracker() - .getClusterStatus(false); - if (status.getJobTrackerState() == JobTracker.State.RUNNING) { - mr.shutdown(); + try { + if (mr != null) { + ClusterStatus status = mr.getJobTrackerRunner().getJobTracker() + .getClusterStatus(false); + if (status.getJobTrackerState() == JobTracker.State.RUNNING) { + mr.shutdown(); + } + } + } finally { + mr = null; + + try { + if (dfs != null) { + dfs.shutdown(); + dfs = null; + } + } finally { + dfs = null; } } } @@ -526,7 +541,7 @@ public class TestRecoveryManager { @Test(timeout=120000) public void testJobTrackerInfoCreation() throws Exception { LOG.info("Testing jobtracker.info file"); - MiniDFSCluster dfs = new MiniDFSCluster(new Configuration(), 1, true, null); + dfs = new MiniDFSCluster(new Configuration(), 1, true, null); String namenode = (dfs.getFileSystem()).getUri().getHost() + ":" + (dfs.getFileSystem()).getUri().getPort(); // shut down the data nodes @@ -575,4 +590,91 @@ public class TestRecoveryManager { Assert.assertFalse("JobTracker failed to create info files with datanodes!", failed); } + + static void mkdirWithPerms(FileSystem fs, String dir, short mode) throws IOException { + Path p = new Path(dir); + fs.mkdirs(p); + fs.setPermission(p, new FsPermission(mode)); + } + + @Test(timeout=120000) + public void testJobResubmissionAsDifferentUser() throws Exception { + LOG.info("Testing Job Resubmission as a different user to the jobtracker"); + + final Path HDFS_TEST_DIR = new Path("/tmp"); + + JobConf conf = new JobConf(); + + dfs = new MiniDFSCluster(conf, 1, true, null); + fs = dfs.getFileSystem(); + + conf.set("mapreduce.jobtracker.staging.root.dir", "/user"); + conf.set("mapred.system.dir", "/mapred"); + String mapredSysDir = conf.get("mapred.system.dir"); + mkdirWithPerms(fs, mapredSysDir, (short)0700); + fs.setOwner(new Path(mapredSysDir), + UserGroupInformation.getCurrentUser().getUserName(), "mrgroup"); + + mkdirWithPerms(fs, "/user", (short)0777); + mkdirWithPerms(fs, "/mapred", (short)0777); + mkdirWithPerms(fs, "/tmp", (short)0777); + + mr = + new MiniMRCluster( + 1, dfs.getFileSystem().getUri().toString(), 1, null, null, conf); + + String signalFile = new Path(HDFS_TEST_DIR, "signal").toString(); + + // make sure that the jobtracker is in recovery mode + mr.getJobTrackerConf() + .setBoolean("mapred.jobtracker.restart.recover", true); + + JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker(); + + final JobConf job1 = mr.createJobConf(); + UtilsForTests.configureWaitingJobConf(job1, new Path(HDFS_TEST_DIR, "input"), + new Path(HDFS_TEST_DIR, "output3"), 2, 0, "test-resubmission", signalFile, + signalFile); + + UserGroupInformation ugi = + UserGroupInformation.createUserForTesting("bob", new String[]{"users"}); + job1.setUser(ugi.getUserName()); + + JobClient jc = new JobClient(job1); + RunningJob rJob1 = ugi.doAs(new PrivilegedExceptionAction<RunningJob>() { + public RunningJob run() throws IOException { + JobClient jc = new JobClient(job1); + return jc.submitJob(job1); + } + }); + LOG.info("Submitted first job " + rJob1.getID()); + + while (rJob1.mapProgress() < 0.5f) { + LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done"); + UtilsForTests.waitFor(100); + } + + // kill the jobtracker + LOG.info("Stopping jobtracker"); + mr.stopJobTracker(); + + // start the jobtracker + LOG.info("Starting jobtracker"); + mr.startJobTracker(); + UtilsForTests.waitForJobTracker(jc); + + jobtracker = mr.getJobTrackerRunner().getJobTracker(); + // assert that job is recovered by the jobtracker + Assert.assertEquals("Resubmission failed ", 1, jobtracker.getAllJobs().length); + JobInProgress jip = jobtracker.getJob(rJob1.getID()); + + // Signaling Map task to complete + fs.create(new Path(HDFS_TEST_DIR, "signal")); + while (!jip.isComplete()) { + LOG.info("Waiting for job " + rJob1.getID() + " to be successful"); + UtilsForTests.waitFor(100); + } + rJob1 = jc.getJob(rJob1.getID()); + Assert.assertTrue("Task should be successful", rJob1.isSuccessful()); + } }