Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java?rev=1346214&r1=1346213&r2=1346214&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java (original) +++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java Tue Jun 5 02:33:44 2012 @@ -485,7 +485,7 @@ public class TestTaskTrackerMemoryManage // tree rooted at 100 is over limit immediately, as it is // twice over the mem limit. ProcfsBasedProcessTree pTree = new ProcfsBasedProcessTree( - "100", true, 100L, + "100", true, procfsRootDir.getAbsolutePath()); pTree.getProcessTree(); assertTrue("tree rooted at 100 should be over limit " + @@ -493,7 +493,7 @@ public class TestTaskTrackerMemoryManage test.isProcessTreeOverLimit(pTree, "dummyId", limit)); // the tree rooted at 200 is initially below limit. - pTree = new ProcfsBasedProcessTree("200", true, 100L, + pTree = new ProcfsBasedProcessTree("200", true, procfsRootDir.getAbsolutePath()); pTree.getProcessTree(); assertFalse("tree rooted at 200 shouldn't be over limit " + @@ -506,7 +506,7 @@ public class TestTaskTrackerMemoryManage test.isProcessTreeOverLimit(pTree, "dummyId", limit)); // the tree rooted at 600 is never over limit. - pTree = new ProcfsBasedProcessTree("600", true, 100L, + pTree = new ProcfsBasedProcessTree("600", true, procfsRootDir.getAbsolutePath()); pTree.getProcessTree(); assertFalse("tree rooted at 600 should never be over limit.",
Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java?rev=1346214&r1=1346213&r2=1346214&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java (original) +++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java Tue Jun 5 02:33:44 2012 @@ -25,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController; import org.apache.hadoop.mapreduce.filecache.TestTrackerDistributedCacheManager; @@ -64,7 +65,7 @@ public class TestTrackerDistributedCache String execPath = path + "/task-controller"; ((MyLinuxTaskController)taskController).setTaskControllerExe(execPath); taskController.setConf(conf); - taskController.setup(); + taskController.setup(new LocalDirAllocator("mapred.local.dir")); } @Override Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java?rev=1346214&r1=1346213&r2=1346214&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java (original) +++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java Tue Jun 5 02:33:44 2012 @@ -27,7 +27,8 @@ import org.apache.hadoop.mapred.UtilsFor import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.server.tasktracker.Localizer; -import org.apache.hadoop.mapreduce.util.MRAsyncDiskService; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; import static org.junit.Assert.*; @@ -47,12 +48,18 @@ public class TestUserLogCleanup { private JobID jobid4 = new JobID(jtid, 4); private File foo = new File(TaskLog.getUserLogDir(), "foo"); private File bar = new File(TaskLog.getUserLogDir(), "bar"); + private TaskController taskController; public TestUserLogCleanup() throws IOException { Configuration conf = new Configuration(); - localizer = new Localizer(FileSystem.get(conf), conf - .getStrings(MRConfig.LOCAL_DIR), new DefaultTaskController()); - taskLogCleanupThread = new UserLogCleaner(conf); + localizer = + new Localizer(FileSystem.get(conf), conf.getStrings(MRConfig.LOCAL_DIR)); + Class<? extends TaskController> taskControllerClass = + conf.getClass("mapred.task.tracker.task-controller", + DefaultTaskController.class, TaskController.class); + taskController = + (TaskController) ReflectionUtils.newInstance(taskControllerClass, conf); + taskLogCleanupThread = new UserLogCleaner(conf, taskController); taskLogCleanupThread.setClock(myClock); tt = new TaskTracker(); tt.setConf(new JobConf(conf)); @@ -66,11 +73,10 @@ public class TestUserLogCleanup { } private File localizeJob(JobID jobid) throws IOException { + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + new JobLocalizer(tt.getJobConf(), user, + jobid.toString()).initializeJobLogDir(); File jobUserlog = TaskLog.getJobDir(jobid); - - JobConf conf = new JobConf(); - // localize job log directory - tt.initializeJobLogDir(jobid, conf); assertTrue(jobUserlog + " directory is not created.", jobUserlog.exists()); return jobUserlog; } @@ -103,16 +109,17 @@ public class TestUserLogCleanup { // add the job for deletion with one hour as retain hours jobFinished(jobid2, 1); - // remove old logs and see jobid1 is not removed and jobid2 is removed myClock.advance(ONE_HOUR); taskLogCleanupThread.processCompletedJobs(); + retry(jobUserlog2); + assertTrue(jobUserlog1 + " got deleted", jobUserlog1.exists()); - assertFalse(jobUserlog2 + " still exists.", jobUserlog2.exists()); - + assertFalse(jobUserlog2 + " still exists.", jobUserlog2.exists()); myClock.advance(ONE_HOUR); // remove old logs and see jobid1 is removed now taskLogCleanupThread.processCompletedJobs(); + retry(jobUserlog1); assertFalse(jobUserlog1 + " still exists.", jobUserlog1.exists()); } @@ -151,18 +158,18 @@ public class TestUserLogCleanup { Configuration conf = new Configuration(); conf.setInt(MRJobConfig.USER_LOG_RETAIN_HOURS, 3); taskLogCleanupThread.clearOldUserLogs(conf); + retry(foo, bar); assertFalse(foo.exists()); assertFalse(bar.exists()); assertTrue(jobUserlog1.exists()); assertTrue(jobUserlog2.exists()); assertTrue(jobUserlog3.exists()); assertTrue(jobUserlog4.exists()); - assertTrue(new File(TaskLog.getUserLogDir(), MRAsyncDiskService.TOBEDELETED) - .exists()); myClock.advance(ONE_HOUR); // time is now 2. taskLogCleanupThread.processCompletedJobs(); + retry(jobUserlog1); assertFalse(jobUserlog1.exists()); assertTrue(jobUserlog2.exists()); assertTrue(jobUserlog3.exists()); @@ -175,29 +182,31 @@ public class TestUserLogCleanup { jobFinished(jobid3, 3); // mimic localizeJob for jobid4 - jobUserlog4 = localizeJob(jobid4); + //jobUserlog4 = localizeJob(jobid4); // do cleanup myClock.advance(2 * ONE_HOUR); // time is now 4. taskLogCleanupThread.processCompletedJobs(); + retry(jobUserlog1, jobUserlog2, jobUserlog4); // jobid2 will be deleted assertFalse(jobUserlog1.exists()); assertFalse(jobUserlog2.exists()); assertTrue(jobUserlog3.exists()); - assertTrue(jobUserlog4.exists()); + assertFalse(jobUserlog4.exists()); myClock.advance(ONE_HOUR); // time is now 5. // do cleanup again taskLogCleanupThread.processCompletedJobs(); + retry(jobUserlog1, jobUserlog2, jobUserlog3); // jobid3 will be deleted assertFalse(jobUserlog1.exists()); assertFalse(jobUserlog2.exists()); assertFalse(jobUserlog3.exists()); - assertTrue(jobUserlog4.exists()); + assertFalse(jobUserlog4.exists()); } /** @@ -232,7 +241,7 @@ public class TestUserLogCleanup { // job directories will be added with 3 hours as retain hours. Configuration conf = new Configuration(); conf.setInt(MRJobConfig.USER_LOG_RETAIN_HOURS, 3); - taskLogCleanupThread = new UserLogCleaner(conf); + taskLogCleanupThread = new UserLogCleaner(conf, taskController); myClock = new FakeClock(); // clock is reset. taskLogCleanupThread.setClock(myClock); taskLogCleanupThread.clearOldUserLogs(conf); @@ -243,8 +252,6 @@ public class TestUserLogCleanup { assertTrue(jobUserlog2.exists()); assertTrue(jobUserlog3.exists()); assertTrue(jobUserlog4.exists()); - assertTrue(new File(TaskLog.getUserLogDir(), MRAsyncDiskService.TOBEDELETED) - .exists()); myClock.advance(ONE_HOUR); // time is now 1. @@ -267,22 +274,42 @@ public class TestUserLogCleanup { myClock.advance(2 * ONE_HOUR); // time is now 3. taskLogCleanupThread.processCompletedJobs(); + retry(jobUserlog1, jobUserlog2, jobUserlog4); // jobid1 and jobid2 will be deleted assertFalse(jobUserlog1.exists()); assertFalse(jobUserlog2.exists()); assertTrue(jobUserlog3.exists()); - assertTrue(jobUserlog4.exists()); + assertFalse(jobUserlog4.exists()); myClock.advance(ONE_HOUR); // time is now 4. // do cleanup again taskLogCleanupThread.processCompletedJobs(); - + retry(jobUserlog1, jobUserlog2, jobUserlog3, jobUserlog4); + // jobid3 will be deleted assertFalse(jobUserlog1.exists()); assertFalse(jobUserlog2.exists()); assertFalse(jobUserlog3.exists()); - assertTrue(jobUserlog4.exists()); + assertFalse(jobUserlog4.exists()); + } + + private void retry(File... jobDirs) { + //since the deletion is done by a thread, we poll for sometime + short retries = 0; + while (retries++ < 20) { + boolean exist = false; + for (File dir : jobDirs) { + if (dir.exists()) { + exist = true; + } + } + if (exist) { + try { + Thread.sleep(500); + } catch (InterruptedException ie){} + } else return; + } } } Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java?rev=1346214&r1=1346213&r2=1346214&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java (original) +++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java Tue Jun 5 02:33:44 2012 @@ -47,6 +47,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext; import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityReducer; @@ -661,7 +662,7 @@ public class UtilsForTests { * asynchronously. */ public static class InlineCleanupQueue extends CleanupQueue { - List<String> stalePaths = new ArrayList<String>(); + List<Path> stalePaths = new ArrayList<Path>(); public InlineCleanupQueue() { // do nothing @@ -671,19 +672,37 @@ public class UtilsForTests { public void addToQueue(PathDeletionContext... contexts) { // delete paths in-line for (PathDeletionContext context : contexts) { + Exception exc = null; try { if (!deletePath(context)) { LOG.warn("Stale path " + context.fullPath); stalePaths.add(context.fullPath); } } catch (IOException e) { + exc = e; + } catch (InterruptedException ie) { + exc = ie; + } + if (exc != null) { LOG.warn("Caught exception while deleting path " + context.fullPath); - LOG.info(StringUtils.stringifyException(e)); + LOG.info(StringUtils.stringifyException(exc)); stalePaths.add(context.fullPath); } } } + static boolean deletePath(PathDeletionContext context) + throws IOException, InterruptedException { + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to delete " + context.fullPath); + } +// FileSystem fs = context.fullPath.getFileSystem(context.conf); +// if (fs.exists(context.fullPath)) { +// return fs.delete(context.fullPath, true); +// } + context.deletePath(); + return true; + } } static class FakeClock extends Clock { Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java?rev=1346214&r1=1346213&r2=1346214&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java (original) +++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java Tue Jun 5 02:33:44 2012 @@ -32,8 +32,12 @@ import javax.security.auth.login.LoginEx import junit.framework.TestCase; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.DefaultTaskController; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.JobLocalizer; import org.apache.hadoop.mapred.TaskController; import org.apache.hadoop.mapred.TaskTracker; import org.apache.hadoop.mapreduce.Cluster; @@ -41,6 +45,9 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.filecache.DistributedCache; +import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager.CacheFile; +import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager.CacheStatus; +import org.apache.hadoop.mapreduce.filecache.TestTrackerDistributedCacheManager; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -54,10 +61,10 @@ import org.apache.hadoop.mapreduce.filec import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; -import org.mortbay.log.Log; public class TestTrackerDistributedCacheManager extends TestCase { - + private static final Log LOG = + LogFactory.getLog(TestTrackerDistributedCacheManager.class); protected String TEST_ROOT_DIR = new File(System.getProperty("test.build.data", "/tmp"), TestTrackerDistributedCacheManager.class.getSimpleName()) @@ -68,10 +75,12 @@ public class TestTrackerDistributedCache private static final int TEST_FILE_SIZE = 4 * 1024; // 4K private static final int LOCAL_CACHE_LIMIT = 5 * 1024; //5K - private static final int LOCAL_CACHE_SUBDIR = 2; + private static final int LOCAL_CACHE_SUBDIR = 1; protected Configuration conf; protected Path firstCacheFile; + protected Path firstCacheFilePublic; protected Path secondCacheFile; + protected Path secondCacheFilePublic; private FileSystem fs; protected LocalDirAllocator localDirAllocator = @@ -119,18 +128,22 @@ public class TestTrackerDistributedCache taskControllerClass, conf); // setup permissions for mapred local dir - taskController.setup(); + taskController.setup(localDirAllocator); // Create the temporary cache files to be used in the tests. firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile"); secondCacheFile = new Path(TEST_ROOT_DIR, "secondcachefile"); + firstCacheFilePublic = new Path(TEST_ROOT_DIR, "firstcachefileOne"); + secondCacheFilePublic = new Path(TEST_ROOT_DIR, "secondcachefileOne"); + createPublicTempFile(firstCacheFilePublic); + createPublicTempFile(secondCacheFilePublic); createPrivateTempFile(firstCacheFile); createPrivateTempFile(secondCacheFile); } protected void refreshConf(Configuration conf) throws IOException { taskController.setConf(conf); - taskController.setup(); + taskController.setup(localDirAllocator); } /** @@ -148,7 +161,8 @@ public class TestTrackerDistributedCache * @throws IOException * @throws LoginException */ - public void testManagerFlow() throws IOException, LoginException { + public void testManagerFlow() + throws IOException, LoginException, InterruptedException { if (!canRun()) { return; } @@ -158,6 +172,7 @@ public class TestTrackerDistributedCache Configuration subConf = new Configuration(conf); String userName = getJobOwnerName(); subConf.set(MRJobConfig.USER_NAME, userName); + JobID jobid = new JobID("jt",1); DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf); DistributedCache.addFileToClassPath(secondCacheFile, subConf); TrackerDistributedCacheManager.determineTimestamps(subConf); @@ -171,15 +186,18 @@ public class TestTrackerDistributedCache // ****** Imitate TaskRunner code. TrackerDistributedCacheManager manager = - new TrackerDistributedCacheManager(conf, taskController); + new TrackerDistributedCacheManager(conf); TaskDistributedCacheManager handle = - manager.newTaskDistributedCacheManager(subConf); + manager.newTaskDistributedCacheManager(jobid, subConf); assertNull(null, DistributedCache.getLocalCacheFiles(subConf)); File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString()); - handle.setup(localDirAllocator, workDir, TaskTracker - .getPrivateDistributedCacheDir(userName), - TaskTracker.getPublicDistributedCacheDir()); - // ****** End of imitating TaskRunner code + handle.setupCache(subConf, TaskTracker.getPublicDistributedCacheDir(), + TaskTracker.getPrivateDistributedCacheDir(userName)); + JobLocalizer.downloadPrivateCache(subConf); + // DOESN'T ACTUALLY HAPPEN IN THE TaskRunner (THIS IS A TODO) +// handle.setupPrivateCache(localDirAllocator, TaskTracker +// .getPrivateDistributedCacheDir(userName)); +// // ****** End of imitating TaskRunner code Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf); assertNotNull(null, localCacheFiles); @@ -208,18 +226,21 @@ public class TestTrackerDistributedCache TrackerDistributedCacheManager { public FakeTrackerDistributedCacheManager(Configuration conf) throws IOException { - super(conf, taskController); + super(conf); } @Override - Path localizeCache(Configuration conf, URI cache, long confFileStamp, - CacheStatus cacheStatus, boolean isArchive, boolean isPublic) - throws IOException { - if (cache.equals(firstCacheFile.toUri())) { + Path localizePublicCacheObject(Configuration conf, URI cache, + long confFileStamp, + CacheStatus cacheStatus, + FileStatus fileStatus, + boolean isArchive) throws IOException, InterruptedException { + if (cache.equals(firstCacheFilePublic.toUri())) { throw new IOException("fake fail"); } - return super.localizeCache(conf, cache, confFileStamp, cacheStatus, - isArchive, isPublic); + return super.localizePublicCacheObject(conf, cache, confFileStamp, + cacheStatus, fileStatus, + isArchive); } } @@ -230,57 +251,58 @@ public class TestTrackerDistributedCache } TrackerDistributedCacheManager manager = new FakeTrackerDistributedCacheManager(conf); - Cluster cluster = new Cluster(conf); + String userName = getJobOwnerName(); File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString()); // Configures a job with a regular file - Job job1 = Job.getInstance(cluster, conf); - job1.setUser(userName); - job1.addCacheFile(secondCacheFile.toUri()); + Job job1 = new Job(conf); Configuration conf1 = job1.getConfiguration(); + conf1.set("user.name", userName); + DistributedCache.addCacheFile(secondCacheFile.toUri(), conf1); + TrackerDistributedCacheManager.determineTimestamps(conf1); TrackerDistributedCacheManager.determineCacheVisibilities(conf1); // Task localizing for first job TaskDistributedCacheManager handle = manager - .newTaskDistributedCacheManager(conf1); - handle.setup(localDirAllocator, workDir, TaskTracker - .getPrivateDistributedCacheDir(userName), - TaskTracker.getPublicDistributedCacheDir()); + .newTaskDistributedCacheManager(new JobID("jt", 1), conf1); + handle.setupCache(conf1, TaskTracker.getPublicDistributedCacheDir(), + TaskTracker.getPrivateDistributedCacheDir(userName)); + JobLocalizer.downloadPrivateCache(conf1); handle.release(); for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) { - assertEquals(0, manager.getReferenceCount(c.uri, conf1, c.timestamp, - c.owner)); + assertEquals(0, manager.getReferenceCount(c.getStatus())); } Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile"); createPrivateTempFile(thirdCacheFile); // Configures another job with three regular files. - Job job2 = Job.getInstance(cluster, conf); - job2.setUser(userName); + Job job2 = new Job(conf); + Configuration conf2 = job2.getConfiguration(); + conf2.set("user.name", userName); // add a file that would get failed to localize - job2.addCacheFile(firstCacheFile.toUri()); + DistributedCache.addCacheFile(firstCacheFilePublic.toUri(), conf2); // add a file that is already localized by different job - job2.addCacheFile(secondCacheFile.toUri()); + DistributedCache.addCacheFile(secondCacheFile.toUri(), conf2); // add a file that is never localized - job2.addCacheFile(thirdCacheFile.toUri()); - Configuration conf2 = job2.getConfiguration(); + DistributedCache.addCacheFile(thirdCacheFile.toUri(), conf2); + TrackerDistributedCacheManager.determineTimestamps(conf2); TrackerDistributedCacheManager.determineCacheVisibilities(conf2); // Task localizing for second job // localization for the "firstCacheFile" will fail. - handle = manager.newTaskDistributedCacheManager(conf2); + handle = manager.newTaskDistributedCacheManager(new JobID("jt", 2), conf2); Throwable th = null; try { - handle.setup(localDirAllocator, workDir, TaskTracker - .getPrivateDistributedCacheDir(userName), - TaskTracker.getPublicDistributedCacheDir()); + handle.setupCache(conf2, TaskTracker.getPublicDistributedCacheDir(), + TaskTracker.getPrivateDistributedCacheDir(userName)); + JobLocalizer.downloadPrivateCache(conf2); } catch (IOException e) { th = e; - Log.info("Exception during setup", e); + LOG.info("Exception during setup", e); } assertNotNull(th); assertTrue(th.getMessage().contains("fake fail")); @@ -288,15 +310,15 @@ public class TestTrackerDistributedCache th = null; for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) { try { - assertEquals(0, manager.getReferenceCount(c.uri, conf2, c.timestamp, - c.owner)); - } catch (IOException ie) { + int refcount = manager.getReferenceCount(c.getStatus()); + LOG.info("checking refcount " + c.uri + " of " + refcount); + assertEquals(0, refcount); + } catch (NullPointerException ie) { th = ie; - Log.info("Exception getting reference count for " + c.uri, ie); + LOG.info("Exception getting reference count for " + c.uri, ie); } } assertNotNull(th); - assertTrue(th.getMessage().contains(thirdCacheFile.getName())); fs.delete(thirdCacheFile, false); } @@ -361,7 +383,7 @@ public class TestTrackerDistributedCache private Path checkLocalizedPath(boolean visibility) throws IOException, LoginException, InterruptedException { TrackerDistributedCacheManager manager = - new TrackerDistributedCacheManager(conf, taskController); + new TrackerDistributedCacheManager(conf); Cluster cluster = new Cluster(conf); String userName = getJobOwnerName(); File workDir = new File(TEST_ROOT_DIR, "workdir"); @@ -381,10 +403,10 @@ public class TestTrackerDistributedCache // Task localizing for job TaskDistributedCacheManager handle = manager - .newTaskDistributedCacheManager(conf1); - handle.setup(localDirAllocator, workDir, TaskTracker - .getPrivateDistributedCacheDir(userName), - TaskTracker.getPublicDistributedCacheDir()); + .newTaskDistributedCacheManager(new JobID("jt", 1), conf1); + handle.setupCache(conf1, TaskTracker.getPublicDistributedCacheDir(), + TaskTracker.getPrivateDistributedCacheDir(userName)); + JobLocalizer.downloadPrivateCache(conf1); TaskDistributedCacheManager.CacheFile c = handle.getCacheFiles().get(0); String distCacheDir; if (visibility) { @@ -394,9 +416,8 @@ public class TestTrackerDistributedCache } Path localizedPath = manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir, - fs.getFileStatus(cacheFile), false, - c.timestamp, new Path(TEST_ROOT_DIR), false, - visibility); + fs.getFileStatus(cacheFile), false, + c.timestamp, visibility, c); assertTrue("Cache file didn't get localized in the expected directory. " + "Expected localization to happen within " + ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir + @@ -504,56 +525,94 @@ public class TestTrackerDistributedCache Configuration conf2 = new Configuration(conf); conf2.set(MRConfig.LOCAL_DIR, ROOT_MAPRED_LOCAL_DIR.toString()); conf2.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, LOCAL_CACHE_LIMIT); - conf2.setLong(TTConfig.TT_LOCAL_CACHE_SUBDIRS_LIMIT, LOCAL_CACHE_SUBDIR); conf2.setLong(TTConfig.TT_DISTRIBUTED_CACHE_CHECK_PERIOD, 200); // 200 ms refreshConf(conf2); TrackerDistributedCacheManager manager = - new TrackerDistributedCacheManager(conf2, taskController); + new TrackerDistributedCacheManager(conf2); manager.startCleanupThread(); FileSystem localfs = FileSystem.getLocal(conf2); String userName = getJobOwnerName(); conf2.set(MRJobConfig.USER_NAME, userName); // We first test the size limit - Path localCache = manager.getLocalCache(firstCacheFile.toUri(), conf2, + FileStatus stat = fs.getFileStatus(firstCacheFilePublic); + CacheFile cfile1 = new CacheFile(firstCacheFilePublic.toUri(), + CacheFile.FileType.REGULAR, true, + stat.getModificationTime(), + true); + Path firstLocalCache = manager.getLocalCache(firstCacheFilePublic.toUri(), conf2, TaskTracker.getPrivateDistributedCacheDir(userName), - fs.getFileStatus(firstCacheFile), false, - getFileStamp(firstCacheFile), new Path(TEST_ROOT_DIR), false, false); - manager.releaseCache(firstCacheFile.toUri(), conf2, - getFileStamp(firstCacheFile), - TrackerDistributedCacheManager.getLocalizedCacheOwner(false)); + fs.getFileStatus(firstCacheFilePublic), false, + fs.getFileStatus(firstCacheFilePublic).getModificationTime(), true, + cfile1); + manager.releaseCache(cfile1.getStatus()); //in above code,localized a file of size 4K and then release the cache // which will cause the cache be deleted when the limit goes out. // The below code localize another cache which's designed to //sweep away the first cache. - manager.getLocalCache(secondCacheFile.toUri(), conf2, + stat = fs.getFileStatus(secondCacheFilePublic); + CacheFile cfile2 = new CacheFile(secondCacheFilePublic.toUri(), + CacheFile.FileType.REGULAR, true, + stat.getModificationTime(), + true); + assertTrue("DistributedCache currently doesn't have cached file", + localfs.exists(firstLocalCache)); + Path secondLocalCache = manager.getLocalCache(secondCacheFilePublic.toUri(), conf2, TaskTracker.getPrivateDistributedCacheDir(userName), - fs.getFileStatus(secondCacheFile), false, - getFileStamp(secondCacheFile), new Path(TEST_ROOT_DIR), false, false); - checkCacheDeletion(localfs, localCache, "DistributedCache failed " + + fs.getFileStatus(secondCacheFilePublic), false, + fs.getFileStatus(secondCacheFilePublic).getModificationTime(), true, + cfile2); + checkCacheDeletion(localfs, firstLocalCache, "DistributedCache failed " + "deleting old cache when the cache store is full."); + manager.stopCleanupThread(); // Now we test the number of sub directories limit + conf2.setLong(TTConfig.TT_LOCAL_CACHE_SUBDIRS_LIMIT, LOCAL_CACHE_SUBDIR); + conf2.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, LOCAL_CACHE_LIMIT * 10); + conf2.setLong(TTConfig.TT_DISTRIBUTED_CACHE_CHECK_PERIOD, 200); // 200 ms + manager = + new TrackerDistributedCacheManager(conf2); + manager.startCleanupThread(); // Create the temporary cache files to be used in the tests. Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile"); Path fourthCacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile"); // Adding two more small files, so it triggers the number of sub directory // limit but does not trigger the file size limit. - createTempFile(thirdCacheFile, 1); - createTempFile(fourthCacheFile, 1); + createPrivateTempFile(thirdCacheFile); + createPrivateTempFile(fourthCacheFile); + DistributedCache.setCacheFiles(new URI[]{thirdCacheFile.toUri()}, conf2); + TrackerDistributedCacheManager.determineCacheVisibilities(conf2); + TrackerDistributedCacheManager.determineTimestamps(conf2); + stat = fs.getFileStatus(thirdCacheFile); + CacheFile cfile3 = new CacheFile(thirdCacheFile.toUri(), + CacheFile.FileType.REGULAR, false, + stat.getModificationTime(), + true); Path thirdLocalCache = manager.getLocalCache(thirdCacheFile.toUri(), conf2, TaskTracker.getPrivateDistributedCacheDir(userName), fs.getFileStatus(thirdCacheFile), false, - getFileStamp(thirdCacheFile), new Path(TEST_ROOT_DIR), false, false); + fs.getFileStatus(thirdCacheFile).getModificationTime(), + false, cfile3); + DistributedCache.setLocalFiles(conf2, thirdLocalCache.toString()); + JobLocalizer.downloadPrivateCache(conf2); // Release the third cache so that it can be deleted while sweeping - manager.releaseCache(thirdCacheFile.toUri(), conf2, - getFileStamp(thirdCacheFile), - TrackerDistributedCacheManager.getLocalizedCacheOwner(false)); + manager.releaseCache(cfile3.getStatus()); // Getting the fourth cache will make the number of sub directories becomes // 3 which is greater than 2. So the released cache will be deleted. - manager.getLocalCache(fourthCacheFile.toUri(), conf2, + stat = fs.getFileStatus(fourthCacheFile); + CacheFile cfile4 = new CacheFile(fourthCacheFile.toUri(), + CacheFile.FileType.REGULAR, false, + stat.getModificationTime(), + true); + assertTrue("DistributedCache currently doesn't have cached file", + localfs.exists(thirdLocalCache)); + DistributedCache.setCacheFiles(new URI[]{fourthCacheFile.toUri()}, conf2); + DistributedCache.setLocalFiles(conf2, thirdCacheFile.toUri().toString()); + TrackerDistributedCacheManager.determineCacheVisibilities(conf2); + TrackerDistributedCacheManager.determineTimestamps(conf2); + Path fourthLocalCache = manager.getLocalCache(fourthCacheFile.toUri(), conf2, TaskTracker.getPrivateDistributedCacheDir(userName), fs.getFileStatus(fourthCacheFile), false, - getFileStamp(fourthCacheFile), new Path(TEST_ROOT_DIR), false, false); + fs.getFileStatus(fourthCacheFile).getModificationTime(), false, cfile4); checkCacheDeletion(localfs, thirdLocalCache, "DistributedCache failed deleting old" + " cache when the cache exceeds the number of sub directories limit."); @@ -587,17 +646,20 @@ public class TestTrackerDistributedCache return; } TrackerDistributedCacheManager manager = - new TrackerDistributedCacheManager(conf, taskController); + new TrackerDistributedCacheManager(conf); conf.set("fs.fakefile.impl", conf.get("fs.file.impl")); String userName = getJobOwnerName(); conf.set(MRJobConfig.USER_NAME, userName); Path fileToCache = new Path("fakefile:///" + firstCacheFile.toUri().getPath()); + CacheFile file = new CacheFile(fileToCache.toUri(), + CacheFile.FileType.REGULAR, + false, 0, false); Path result = manager.getLocalCache(fileToCache.toUri(), conf, TaskTracker.getPrivateDistributedCacheDir(userName), fs.getFileStatus(firstCacheFile), false, - getFileStamp(firstCacheFile), - new Path(TEST_ROOT_DIR), false, false); + System.currentTimeMillis(), + false, file); assertNotNull("DistributedCache cached file on non-default filesystem.", result); } @@ -632,6 +694,8 @@ public class TestTrackerDistributedCache protected void tearDown() throws IOException { new File(firstCacheFile.toString()).delete(); new File(secondCacheFile.toString()).delete(); + new File(firstCacheFilePublic.toString()).delete(); + new File(secondCacheFilePublic.toString()).delete(); FileUtil.fullyDelete(new File(TEST_ROOT_DIR)); } @@ -652,9 +716,13 @@ public class TestTrackerDistributedCache } public FileStatus getFileStatus(Path p) throws IOException { + FileStatus rawFileStatus = super.getFileStatus(p); File f = pathToFile(p); - return new FileStatus(f.length(), f.isDirectory(), 1, 128, - f.lastModified() + increment, makeQualified(new Path(f.getPath()))); + FileStatus status = new FileStatus(f.length(), f.isDirectory(), 1, 128, + f.lastModified() + increment, 0, + rawFileStatus.getPermission(), rawFileStatus.getOwner(), + rawFileStatus.getGroup(), makeQualified(new Path(f.getPath()))); + return status; } void advanceClock(long millis) { @@ -672,7 +740,7 @@ public class TestTrackerDistributedCache String userName = getJobOwnerName(); TrackerDistributedCacheManager manager = - new TrackerDistributedCacheManager(myConf, taskController); + new TrackerDistributedCacheManager(myConf); // ****** Imitate JobClient code // Configures a task/job with both a regular file and a "classpath" file. Configuration subConf = new Configuration(myConf); @@ -684,14 +752,16 @@ public class TestTrackerDistributedCache // ****** Imitate TaskRunner code. TaskDistributedCacheManager handle = - manager.newTaskDistributedCacheManager(subConf); + manager.newTaskDistributedCacheManager(new JobID("jt", 1), subConf); assertNull(null, DistributedCache.getLocalCacheFiles(subConf)); File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString()); - handle.setup(localDirAllocator, workDir, TaskTracker - .getPrivateDistributedCacheDir(userName), - TaskTracker.getPublicDistributedCacheDir()); + handle.setupCache(subConf, TaskTracker.getPublicDistributedCacheDir(), + TaskTracker.getPrivateDistributedCacheDir(userName)); + //TODO this doesn't really happen in the TaskRunner +// handle.setupPrivateCache(localDirAllocator, TaskTracker +// .getPrivateDistributedCacheDir(userName)); // ****** End of imitating TaskRunner code - + JobLocalizer.downloadPrivateCache(subConf); Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf); assertNotNull(null, localCacheFiles); assertEquals(1, localCacheFiles.length); @@ -709,8 +779,10 @@ public class TestTrackerDistributedCache // running a task of the same job Throwable th = null; try { - handle.setup(localDirAllocator, workDir, TaskTracker - .getPrivateDistributedCacheDir(userName), TaskTracker.getPublicDistributedCacheDir()); + handle.setupCache(subConf, TaskTracker.getPublicDistributedCacheDir(), + TaskTracker.getPrivateDistributedCacheDir(userName)); +// handle.setupPrivateCache(localDirAllocator, TaskTracker +// .getPrivateDistributedCacheDir(userName)); } catch (IOException ie) { th = ie; } @@ -723,21 +795,22 @@ public class TestTrackerDistributedCache // running a task of the same job on another TaskTracker which has never // initialized the cache TrackerDistributedCacheManager manager2 = - new TrackerDistributedCacheManager(myConf, taskController); + new TrackerDistributedCacheManager(myConf); TaskDistributedCacheManager handle2 = - manager2.newTaskDistributedCacheManager(subConf); + manager2.newTaskDistributedCacheManager(new JobID("jt", 1), subConf); File workDir2 = new File(new Path(TEST_ROOT_DIR, "workdir2").toString()); th = null; try { - handle2.setup(localDirAllocator, workDir2, TaskTracker + handle2.setupCache(subConf, TaskTracker .getPrivateDistributedCacheDir(userName), TaskTracker.getPublicDistributedCacheDir()); + JobLocalizer.downloadPrivateCache(subConf); } catch (IOException ie) { th = ie; } assertNotNull("Throwable is null", th); assertTrue("Exception message does not match", - th.getMessage().contains("has changed on HDFS since job started")); + th.getMessage().contains("changed during the job from")); // release handle.release(); @@ -749,9 +822,10 @@ public class TestTrackerDistributedCache TrackerDistributedCacheManager.determineCacheVisibilities(subConf2); handle = - manager.newTaskDistributedCacheManager(subConf2); - handle.setup(localDirAllocator, workDir, TaskTracker - .getPrivateDistributedCacheDir(userName), TaskTracker.getPublicDistributedCacheDir()); + manager.newTaskDistributedCacheManager(new JobID("jt", 2), subConf2); + handle.setupCache(subConf2, TaskTracker.getPublicDistributedCacheDir(), + TaskTracker.getPrivateDistributedCacheDir(userName)); + JobLocalizer.downloadPrivateCache(subConf2); Path[] localCacheFiles2 = DistributedCache.getLocalCacheFiles(subConf2); assertNotNull(null, localCacheFiles2); assertEquals(1, localCacheFiles2.length); @@ -781,26 +855,36 @@ public class TestTrackerDistributedCache String userName = getJobOwnerName(); conf.set(MRJobConfig.USER_NAME, userName); TrackerDistributedCacheManager manager = - new TrackerDistributedCacheManager(conf, taskController); + new TrackerDistributedCacheManager(conf); FileSystem localfs = FileSystem.getLocal(conf); + long now = System.currentTimeMillis(); Path[] localCache = new Path[2]; - localCache[0] = manager.getLocalCache(firstCacheFile.toUri(), conf, + FileStatus stat = fs.getFileStatus(firstCacheFile); + CacheFile file = new CacheFile(firstCacheFilePublic.toUri(), + CacheFile.FileType.REGULAR, true, + stat.getModificationTime(), false); + localCache[0] = manager.getLocalCache(firstCacheFilePublic.toUri(), conf, TaskTracker.getPrivateDistributedCacheDir(userName), - fs.getFileStatus(firstCacheFile), false, - getFileStamp(firstCacheFile), new Path(TEST_ROOT_DIR), false, false); + fs.getFileStatus(firstCacheFilePublic), false, + fs.getFileStatus(firstCacheFilePublic).getModificationTime(), true, + file); FsPermission myPermission = new FsPermission((short)0600); Path myFile = new Path(localCache[0].getParent(), "myfile.txt"); if (FileSystem.create(localfs, myFile, myPermission) == null) { throw new IOException("Could not create " + myFile); } try { - localCache[1] = manager.getLocalCache(secondCacheFile.toUri(), conf, + stat = fs.getFileStatus(secondCacheFilePublic); + file = new CacheFile(secondCacheFilePublic.toUri(), + CacheFile.FileType.REGULAR, + true, stat.getModificationTime(), false); + localCache[1] = manager.getLocalCache(secondCacheFilePublic.toUri(), conf, TaskTracker.getPrivateDistributedCacheDir(userName), - fs.getFileStatus(secondCacheFile), false, - getFileStamp(secondCacheFile), new Path(TEST_ROOT_DIR), false, - false); - FileStatus stat = localfs.getFileStatus(myFile); + fs.getFileStatus(secondCacheFilePublic), false, + fs.getFileStatus(secondCacheFilePublic).getModificationTime(), true, + file); + stat = localfs.getFileStatus(myFile); assertTrue(stat.getPermission().equals(myPermission)); // validate permissions of localized files. checkFilePermissions(localCache); Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/util/TestProcfsBasedProcessTree.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/util/TestProcfsBasedProcessTree.java?rev=1346214&r1=1346213&r2=1346214&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/util/TestProcfsBasedProcessTree.java (original) +++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/util/TestProcfsBasedProcessTree.java Tue Jun 5 02:33:44 2012 @@ -35,9 +35,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.DefaultTaskController; +import org.apache.hadoop.mapred.TaskController; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Shell.ExitCodeException; import org.apache.hadoop.util.Shell.ShellCommandExecutor; +import static org.apache.hadoop.mapred.TaskController.Signal; import junit.framework.TestCase; @@ -60,7 +63,7 @@ public class TestProcfsBasedProcessTree public void run() { try { Vector<String> args = new Vector<String>(); - if(ProcessTree.isSetsidAvailable) { + if(TaskController.isSetsidAvailable) { args.add("setsid"); } args.add("bash"); @@ -95,7 +98,7 @@ public class TestProcfsBasedProcessTree return getPidFromPidFile(pidFile); } - public void testProcessTree() { + public void testProcessTree() throws Exception { try { if (!ProcfsBasedProcessTree.isAvailable()) { @@ -149,8 +152,7 @@ public class TestProcfsBasedProcessTree String pid = getRogueTaskPID(); LOG.info("Root process pid: " + pid); ProcfsBasedProcessTree p = new ProcfsBasedProcessTree(pid, - ProcessTree.isSetsidAvailable, - ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL); + TaskController.isSetsidAvailable); p = p.getProcessTree(); // initialize LOG.info("ProcessTree: " + p.toString()); @@ -171,13 +173,14 @@ public class TestProcfsBasedProcessTree String processTreeDump = p.getProcessTreeDump(); // destroy the process and all its subprocesses - p.destroy(true/*in the background*/); + TaskController tc = new DefaultTaskController(); + tc.signalTask(null, Integer.valueOf(pid), Signal.KILL); - if(ProcessTree.isSetsidAvailable) {// whole processtree should be gone - assertEquals(false, p.isAnyProcessInTreeAlive()); - } - else {// process should be gone - assertFalse("ProcessTree must have been gone", p.isAlive()); + if (TaskController.isSetsidAvailable) { // whole processtree should be gone + assertFalse("Proceesses in process group live", + p.isAnyProcessInTreeAlive(tc)); + } else {// process should be gone + assertFalse("ProcessTree must have been gone", p.isAlive(tc)); } LOG.info("Process-tree dump follows: \n" + processTreeDump); @@ -204,7 +207,7 @@ public class TestProcfsBasedProcessTree // ProcessTree is gone now. Any further calls should be sane. p = p.getProcessTree(); - assertFalse("ProcessTree must have been gone", p.isAlive()); + assertFalse("ProcessTree must have been gone", p.isAlive(tc)); assertTrue("Cumulative vmem for the gone-process is " + p.getCumulativeVmem() + " . It should be zero.", p .getCumulativeVmem() == 0); @@ -333,8 +336,8 @@ public class TestProcfsBasedProcessTree // crank up the process tree class. ProcfsBasedProcessTree processTree = - new ProcfsBasedProcessTree("100", true, 100L, - procfsRootDir.getAbsolutePath()); + new ProcfsBasedProcessTree("100", true, + procfsRootDir.getAbsolutePath()); // build the process tree. processTree.getProcessTree(); @@ -406,8 +409,8 @@ public class TestProcfsBasedProcessTree // crank up the process tree class. ProcfsBasedProcessTree processTree = - new ProcfsBasedProcessTree("100", true, 100L, - procfsRootDir.getAbsolutePath()); + new ProcfsBasedProcessTree("100", true, + procfsRootDir.getAbsolutePath()); // build the process tree. processTree.getProcessTree(); @@ -498,11 +501,11 @@ public class TestProcfsBasedProcessTree // crank up the process tree class. ProcfsBasedProcessTree processTree = new ProcfsBasedProcessTree( - pid, true, 100L, procfsRootDir.getAbsolutePath()); + pid, true, procfsRootDir.getAbsolutePath()); // Let us not create stat file for pid 100. assertTrue(ProcfsBasedProcessTree.checkPidPgrpidForMatch( - pid, procfsRootDir.getAbsolutePath())); + Integer.valueOf(pid), procfsRootDir.getAbsolutePath())); } finally { FileUtil.fullyDelete(procfsRootDir); } @@ -551,9 +554,8 @@ public class TestProcfsBasedProcessTree writeStatFiles(procfsRootDir, pids, procInfos); writeCmdLineFiles(procfsRootDir, pids, cmdLines); - ProcfsBasedProcessTree processTree = - new ProcfsBasedProcessTree("100", true, 100L, procfsRootDir - .getAbsolutePath()); + ProcfsBasedProcessTree processTree = new ProcfsBasedProcessTree( + "100", true, procfsRootDir.getAbsolutePath()); // build the process tree. processTree.getProcessTree(); Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/testshell/ExternalMapReduce.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/testshell/ExternalMapReduce.java?rev=1346214&r1=1346213&r2=1346214&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/testshell/ExternalMapReduce.java (original) +++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/testshell/ExternalMapReduce.java Tue Jun 5 02:33:44 2012 @@ -18,6 +18,7 @@ package testshell; +import java.io.File; import java.io.IOException; import java.util.Iterator; @@ -91,6 +92,10 @@ public class ExternalMapReduce extends C if (ret != 0) { throw new IOException("files_tmp does not exist"); } + File file = new File("./ziplink/test.txt"); + if (!file.canExecute()) { + throw new IOException("ziplink/test.txt is not executable"); + } } } Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/unit/org/apache/hadoop/mapred/TestTaskTrackerDirectories.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/unit/org/apache/hadoop/mapred/TestTaskTrackerDirectories.java?rev=1346214&r1=1346213&r2=1346214&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/src/test/unit/org/apache/hadoop/mapred/TestTaskTrackerDirectories.java (original) +++ hadoop/common/branches/branch-0.22/mapreduce/src/test/unit/org/apache/hadoop/mapred/TestTaskTrackerDirectories.java Tue Jun 5 02:33:44 2012 @@ -26,11 +26,12 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; -import org.apache.hadoop.mapreduce.MRConfig; import org.junit.Test; import org.junit.Before; +import org.mockito.Mockito; /** * Tests for the correct behavior of the TaskTracker starting up with @@ -55,8 +56,8 @@ public class TestTaskTrackerDirectories TEST_DIR + "/local2" }; - conf.setStrings(MRConfig.LOCAL_DIR, dirs); - setupTaskController(conf); + conf.setStrings("mapred.local.dir", dirs); + setupTaskTracker(conf); for (String dir : dirs) { checkDir(dir); @@ -73,8 +74,8 @@ public class TestTaskTrackerDirectories new File(dirs[0]).mkdirs(); FileUtil.chmod(dirs[0], "000"); - conf.setStrings(MRConfig.LOCAL_DIR, dirs); - setupTaskController(conf); + conf.setStrings("mapred.local.dir", dirs); + setupTaskTracker(conf); for (String dir : dirs) { checkDir(dir); @@ -86,7 +87,7 @@ public class TestTaskTrackerDirectories File dir = TaskLog.getUserLogDir(); FileUtil.fullyDelete(dir); - setupTaskController(new Configuration()); + setupTaskTracker(new Configuration()); checkDir(dir.getAbsolutePath()); } @@ -103,7 +104,7 @@ public class TestTaskTrackerDirectories dir.createNewFile()); try { - setupTaskController(new Configuration()); + setupTaskTracker(new Configuration()); fail("Didn't throw!"); } catch (IOException ioe) { System.err.println("Got expected exception"); @@ -118,15 +119,20 @@ public class TestTaskTrackerDirectories dir.mkdirs(); FileUtil.chmod(dir.getAbsolutePath(), "000"); - setupTaskController(new Configuration()); + setupTaskTracker(new Configuration()); checkDir(dir.getAbsolutePath()); } - private void setupTaskController(Configuration conf) throws IOException { - TaskController tc = new DefaultTaskController(); - tc.setConf(conf); - tc.setup(); + private void setupTaskTracker(Configuration conf) throws Exception { + JobConf ttConf = new JobConf(conf); + // Doesn't matter what we give here - we won't actually + // connect to it. + TaskTracker tt = new TaskTracker(); + tt.setConf(ttConf); + tt.setTaskController(Mockito.mock(TaskController.class)); + fail("TODO: update this test case after 2178"); + // tt.initializeDirectories(); } private void checkDir(String dir) throws IOException {