Author: tucu Date: Thu Jan 10 00:55:11 2013 New Revision: 1431168 URL: http://svn.apache.org/viewvc?rev=1431168&view=rev Log: MAPREDUCE-4907. TrackerDistributedCacheManager issues too many getFileStatus calls. (sandyr via tucu)
Modified: hadoop/common/branches/branch-1/CHANGES.txt hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java Modified: hadoop/common/branches/branch-1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1431168&r1=1431167&r2=1431168&view=diff ============================================================================== --- hadoop/common/branches/branch-1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1/CHANGES.txt Thu Jan 10 00:55:11 2013 @@ -154,6 +154,9 @@ Release 1.2.0 - unreleased HDFS-4320. Add a separate configuration for namenode rpc address instead of using fs.default.name. (Mostafa Elhemali via suresh) + MAPREDUCE-4907. TrackerDistributedCacheManager issues too many + getFileStatus calls. (sandyr via tucu) + OPTIMIZATIONS HDFS-2533. Backport: Remove needless synchronization on some FSDataSet Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1431168&r1=1431167&r2=1431168&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java (original) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java Thu Jan 10 00:55:11 2013 @@ -320,14 +320,15 @@ public class TrackerDistributedCacheMana * @return true if the path in the uri is visible to all, false otherwise * @throws IOException */ - static boolean isPublic(Configuration conf, URI uri) throws IOException { + static boolean isPublic(Configuration conf, URI uri, + Map<URI, FileStatus> statCache) throws IOException { FileSystem fs = FileSystem.get(uri, conf); Path current = new Path(uri.getPath()); //the leaf level file should be readable by others - if (!checkPermissionOfOther(fs, current, FsAction.READ)) { + if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) { return false; } - return ancestorsHaveExecutePermissions(fs, current.getParent()); + return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache); } /** @@ -335,12 +336,12 @@ public class TrackerDistributedCacheMana * permission set for all users (i.e. that other users can traverse * the directory heirarchy to the given path) */ - static boolean ancestorsHaveExecutePermissions(FileSystem fs, Path path) - throws IOException { + static boolean ancestorsHaveExecutePermissions(FileSystem fs, Path path, + Map<URI, FileStatus> statCache) throws IOException { Path current = path; while (current != null) { //the subdirs in the path should have execute permissions for others - if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE)) { + if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) { return false; } current = current.getParent(); @@ -358,8 +359,8 @@ public class TrackerDistributedCacheMana * @throws IOException */ private static boolean checkPermissionOfOther(FileSystem fs, Path path, - FsAction action) throws IOException { - FileStatus status = fs.getFileStatus(path); + FsAction action, Map<URI, FileStatus> statCache) throws IOException { + FileStatus status = getFileStatus(fs, path, statCache); FsPermission perms = status.getPermission(); FsAction otherAction = perms.getOtherAction(); if (otherAction.implies(action)) { @@ -712,25 +713,69 @@ public class TrackerDistributedCacheMana } /** + * Gets the file status for the given URI. If the URI is in the cache, + * returns it. Otherwise, fetches it and adds it to the cache. + */ + private static FileStatus getFileStatus(Configuration job, URI uri, + Map<URI, FileStatus> statCache) throws IOException { + FileStatus stat = statCache.get(uri); + if (stat == null) { + stat = DistributedCache.getFileStatus(job, uri); + statCache.put(uri, stat); + } + return stat; + } + + private static FileStatus getFileStatus(FileSystem fs, Path path, + Map<URI, FileStatus> statCache) throws IOException { + URI uri = path.toUri(); + FileStatus stat = statCache.get(uri); + if (stat == null) { + stat = fs.getFileStatus(path); + statCache.put(uri, stat); + } + return stat; + } + + /** * Determines timestamps of files to be cached, and stores those - * in the configuration. This is intended to be used internally by JobClient - * after all cache files have been added. + * in the configuration. Determines the visibilities of the distributed cache + * files and archives. The visibility of a cache path is "public" if the leaf + * component has READ permissions for others, and the parent subdirs have + * EXECUTE permissions for others. * * This is an internal method! * + * @param job + * @throws IOException + */ + public static void determineTimestampsAndCacheVisibilities(Configuration job) + throws IOException { + Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>(); + determineTimestamps(job, statCache); + determineCacheVisibilities(job, statCache); + } + + /** + * Determines timestamps of files to be cached, and stores those + * in the configuration. + * * @param job Configuration of a job. + * @param statCache a cache of FileStatuses so that redundant remote + * calls can be avoided * @throws IOException */ - public static void determineTimestamps(Configuration job) throws IOException { + static void determineTimestamps(Configuration job, + Map<URI, FileStatus> statCache) throws IOException { URI[] tarchives = DistributedCache.getCacheArchives(job); if (tarchives != null) { - FileStatus status = DistributedCache.getFileStatus(job, tarchives[0]); + FileStatus status = getFileStatus(job, tarchives[0], statCache); StringBuffer archiveFileSizes = new StringBuffer(String.valueOf(status.getLen())); StringBuffer archiveTimestamps = new StringBuffer(String.valueOf(status.getModificationTime())); for (int i = 1; i < tarchives.length; i++) { - status = DistributedCache.getFileStatus(job, tarchives[i]); + status = getFileStatus(job, tarchives[i], statCache); archiveFileSizes.append(","); archiveFileSizes.append(String.valueOf(status.getLen())); archiveTimestamps.append(","); @@ -744,7 +789,7 @@ public class TrackerDistributedCacheMana URI[] tfiles = DistributedCache.getCacheFiles(job); if (tfiles != null) { - FileStatus status = DistributedCache.getFileStatus(job, tfiles[0]); + FileStatus status = getFileStatus(job, tfiles[0], statCache); StringBuffer fileSizes = new StringBuffer(String.valueOf(status.getLen())); StringBuffer fileTimestamps = new StringBuffer(String.valueOf( @@ -766,27 +811,29 @@ public class TrackerDistributedCacheMana * has READ permissions for others, and the parent subdirs have * EXECUTE permissions for others * @param job + * @param statCache a cache of FileStatuses so that redundant remote + * calls can be avoided * @throws IOException */ - public static void determineCacheVisibilities(Configuration job) - throws IOException { + static void determineCacheVisibilities(Configuration job, + Map<URI, FileStatus> statCache) throws IOException { URI[] tarchives = DistributedCache.getCacheArchives(job); if (tarchives != null) { StringBuffer archiveVisibilities = - new StringBuffer(String.valueOf(isPublic(job, tarchives[0]))); + new StringBuffer(String.valueOf(isPublic(job, tarchives[0], statCache))); for (int i = 1; i < tarchives.length; i++) { archiveVisibilities.append(","); - archiveVisibilities.append(String.valueOf(isPublic(job, tarchives[i]))); + archiveVisibilities.append(String.valueOf(isPublic(job, tarchives[i], statCache))); } setArchiveVisibilities(job, archiveVisibilities.toString()); } URI[] tfiles = DistributedCache.getCacheFiles(job); if (tfiles != null) { StringBuffer fileVisibilities = - new StringBuffer(String.valueOf(isPublic(job, tfiles[0]))); + new StringBuffer(String.valueOf(isPublic(job, tfiles[0], statCache))); for (int i = 1; i < tfiles.length; i++) { fileVisibilities.append(","); - fileVisibilities.append(String.valueOf(isPublic(job, tfiles[i]))); + fileVisibilities.append(String.valueOf(isPublic(job, tfiles[i], statCache))); } setFileVisibilities(job, fileVisibilities.toString()); } Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1431168&r1=1431167&r2=1431168&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java (original) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java Thu Jan 10 00:55:11 2013 @@ -827,10 +827,9 @@ public class JobClient extends Configure // First we check whether the cached archives and files are legal. TrackerDistributedCacheManager.validate(job); - // set the timestamps of the archives and files - TrackerDistributedCacheManager.determineTimestamps(job); - // set the public/private visibility of the archives and files - TrackerDistributedCacheManager.determineCacheVisibilities(job); + // set the timestamps of the archives and files and set the + // public/private visibility of the archives and files + TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(job); // get DelegationTokens for cache files TrackerDistributedCacheManager.getDelegationTokens(job, job.getCredentials()); Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1431168&r1=1431167&r2=1431168&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java (original) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java Thu Jan 10 00:55:11 2013 @@ -24,6 +24,8 @@ import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -108,7 +110,7 @@ public class TestTrackerDistributedCache assertTrue("Test root directory " + TEST_ROOT + " and all of its " + "parent directories must have a+x permissions", TrackerDistributedCacheManager.ancestorsHaveExecutePermissions( - fs, new Path(TEST_ROOT.toString()))); + fs, new Path(TEST_ROOT.toString()), new HashMap<URI, FileStatus>())); // Prepare the tests' mapred-local-dir ROOT_MAPRED_LOCAL_DIR = new File(TEST_ROOT_DIR, "mapred/local"); @@ -187,8 +189,11 @@ public class TestTrackerDistributedCache DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf); DistributedCache.addFileToClassPath(secondCacheFile, subConf, FileSystem.get(subConf)); - TrackerDistributedCacheManager.determineTimestamps(subConf); - TrackerDistributedCacheManager.determineCacheVisibilities(subConf); + + Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>(); + TrackerDistributedCacheManager.determineTimestamps(subConf, statCache); + TrackerDistributedCacheManager.determineCacheVisibilities(subConf, statCache); + assertEquals(2, statCache.size()); // ****** End of imitating JobClient code Path jobFile = new Path(TEST_ROOT_DIR, "job.xml"); @@ -273,8 +278,7 @@ public class TestTrackerDistributedCache conf1.set("user.name", userName); DistributedCache.addCacheFile(secondCacheFile.toUri(), conf1); - TrackerDistributedCacheManager.determineTimestamps(conf1); - TrackerDistributedCacheManager.determineCacheVisibilities(conf1); + TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf1); // Task localizing for first job JobID jobId = new JobID("jt", 1); @@ -302,8 +306,7 @@ public class TestTrackerDistributedCache DistributedCache.addCacheFile(firstCacheDirPublic.toUri(), conf2); DistributedCache.addCacheFile(firstCacheDirPrivate.toUri(), conf2); - TrackerDistributedCacheManager.determineTimestamps(conf2); - TrackerDistributedCacheManager.determineCacheVisibilities(conf2); + TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf2); // Task localizing for second job JobID job2Id = new JobID("jt", 2); @@ -339,8 +342,7 @@ public class TestTrackerDistributedCache // add a file that is never localized DistributedCache.addCacheFile(thirdCacheFile.toUri(), conf3); - TrackerDistributedCacheManager.determineTimestamps(conf3); - TrackerDistributedCacheManager.determineCacheVisibilities(conf3); + TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf3); // Task localizing for third job // localization for the "firstCacheFile" will fail. @@ -379,7 +381,7 @@ public class TestTrackerDistributedCache * @throws LoginException */ public void testPublicPrivateCache() - throws IOException, LoginException, InterruptedException { + throws IOException, LoginException, InterruptedException, URISyntaxException { if (!canRun()) { return; } @@ -404,8 +406,7 @@ public class TestTrackerDistributedCache DistributedCache.addCacheFile(cacheFile.toUri(), conf1); DistributedCache.addCacheArchive(cacheFile.toUri(), conf1); - TrackerDistributedCacheManager.determineTimestamps(conf1); - TrackerDistributedCacheManager.determineCacheVisibilities(conf1); + TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf1); dumpState(conf1); TaskDistributedCacheManager handle = manager @@ -491,7 +492,7 @@ public class TestTrackerDistributedCache } private void checkLocalizedPath(boolean visibility) - throws IOException, LoginException, InterruptedException { + throws IOException, LoginException, InterruptedException, URISyntaxException { TrackerDistributedCacheManager manager = new TrackerDistributedCacheManager(conf, taskController); String userName = getJobOwnerName(); @@ -506,8 +507,7 @@ public class TestTrackerDistributedCache Configuration conf1 = new Configuration(conf); conf1.set("user.name", userName); DistributedCache.addCacheFile(cacheFile.toUri(), conf1); - TrackerDistributedCacheManager.determineTimestamps(conf1); - TrackerDistributedCacheManager.determineCacheVisibilities(conf1); + TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf1); dumpState(conf1); // Task localizing for job @@ -894,8 +894,7 @@ public class TestTrackerDistributedCache createPrivateTempFile(thirdCacheFile); createPrivateTempFile(fourthCacheFile); DistributedCache.setCacheFiles(new URI[]{thirdCacheFile.toUri()}, conf2); - TrackerDistributedCacheManager.determineCacheVisibilities(conf2); - TrackerDistributedCacheManager.determineTimestamps(conf2); + TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf2); stat = fs.getFileStatus(thirdCacheFile); CacheFile cfile3 = new CacheFile(thirdCacheFile.toUri(), CacheFile.FileType.REGULAR, false, @@ -922,8 +921,7 @@ public class TestTrackerDistributedCache DistributedCache.setCacheFiles(new URI[]{fourthCacheFile.toUri()}, conf2); DistributedCache.setLocalFiles(conf2, thirdCacheFile.toUri().toString()); - TrackerDistributedCacheManager.determineCacheVisibilities(conf2); - TrackerDistributedCacheManager.determineTimestamps(conf2); + TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf2); Path fourthLocalCache = manager.getLocalCache(fourthCacheFile.toUri(), conf2, TaskTracker.getPrivateDistributedCacheDir(userName), fs.getFileStatus(fourthCacheFile), false, @@ -1100,8 +1098,7 @@ public class TestTrackerDistributedCache Configuration subConf = new Configuration(myConf); subConf.set("user.name", userName); DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf); - TrackerDistributedCacheManager.determineTimestamps(subConf); - TrackerDistributedCacheManager.determineCacheVisibilities(subConf); + TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(subConf); // ****** End of imitating JobClient code // ****** Imitate TaskRunner code. @@ -1150,8 +1147,7 @@ public class TestTrackerDistributedCache Configuration subConf2 = new Configuration(myConf); subConf2.set("user.name", userName); DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf2); - TrackerDistributedCacheManager.determineTimestamps(subConf2); - TrackerDistributedCacheManager.determineCacheVisibilities(subConf2); + TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(subConf2); handle = manager.newTaskDistributedCacheManager(new JobID("jt", 2), subConf2);