Author: tucu
Date: Thu Jan 10 00:55:11 2013
New Revision: 1431168

URL: http://svn.apache.org/viewvc?rev=1431168&view=rev
Log:
MAPREDUCE-4907. TrackerDistributedCacheManager issues too many getFileStatus 
calls. (sandyr via tucu)

Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
    
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java
    
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1431168&r1=1431167&r2=1431168&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Thu Jan 10 00:55:11 2013
@@ -154,6 +154,9 @@ Release 1.2.0 - unreleased
     HDFS-4320. Add a separate configuration for namenode rpc address instead
     of using fs.default.name. (Mostafa Elhemali via suresh)
 
+    MAPREDUCE-4907. TrackerDistributedCacheManager issues too many 
+    getFileStatus calls. (sandyr via tucu)
+
   OPTIMIZATIONS
 
     HDFS-2533. Backport: Remove needless synchronization on some FSDataSet

Modified: 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1431168&r1=1431167&r2=1431168&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
 Thu Jan 10 00:55:11 2013
@@ -320,14 +320,15 @@ public class TrackerDistributedCacheMana
    * @return true if the path in the uri is visible to all, false otherwise
    * @throws IOException
    */
-  static boolean isPublic(Configuration conf, URI uri) throws IOException {
+  static boolean isPublic(Configuration conf, URI uri,
+      Map<URI, FileStatus> statCache) throws IOException {
     FileSystem fs = FileSystem.get(uri, conf);
     Path current = new Path(uri.getPath());
     //the leaf level file should be readable by others
-    if (!checkPermissionOfOther(fs, current, FsAction.READ)) {
+    if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) {
       return false;
     }
-    return ancestorsHaveExecutePermissions(fs, current.getParent());
+    return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache);
   }
 
   /**
@@ -335,12 +336,12 @@ public class TrackerDistributedCacheMana
    * permission set for all users (i.e. that other users can traverse
    * the directory heirarchy to the given path)
    */
-  static boolean ancestorsHaveExecutePermissions(FileSystem fs, Path path)
-    throws IOException {
+  static boolean ancestorsHaveExecutePermissions(FileSystem fs, Path path,
+      Map<URI, FileStatus> statCache) throws IOException {
     Path current = path;
     while (current != null) {
       //the subdirs in the path should have execute permissions for others
-      if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE)) {
+      if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) {
         return false;
       }
       current = current.getParent();
@@ -358,8 +359,8 @@ public class TrackerDistributedCacheMana
    * @throws IOException
    */
   private static boolean checkPermissionOfOther(FileSystem fs, Path path,
-      FsAction action) throws IOException {
-    FileStatus status = fs.getFileStatus(path);
+      FsAction action, Map<URI, FileStatus> statCache) throws IOException {
+    FileStatus status = getFileStatus(fs, path, statCache);
     FsPermission perms = status.getPermission();
     FsAction otherAction = perms.getOtherAction();
     if (otherAction.implies(action)) {
@@ -712,25 +713,69 @@ public class TrackerDistributedCacheMana
   }
 
   /**
+   * Gets the file status for the given URI.  If the URI is in the cache,
+   * returns it.  Otherwise, fetches it and adds it to the cache.
+   */
+  private static FileStatus getFileStatus(Configuration job, URI uri,
+      Map<URI, FileStatus> statCache) throws IOException {
+    FileStatus stat = statCache.get(uri);
+    if (stat == null) {
+      stat = DistributedCache.getFileStatus(job, uri);
+      statCache.put(uri, stat);
+    }
+    return stat;
+  }
+  
+  private static FileStatus getFileStatus(FileSystem fs, Path path,
+      Map<URI, FileStatus> statCache) throws IOException {
+    URI uri = path.toUri();
+    FileStatus stat = statCache.get(uri);
+    if (stat == null) {
+      stat = fs.getFileStatus(path);
+      statCache.put(uri, stat);
+    }
+    return stat;
+  }
+  
+  /**
    * Determines timestamps of files to be cached, and stores those
-   * in the configuration.  This is intended to be used internally by JobClient
-   * after all cache files have been added.
+   * in the configuration. Determines the visibilities of the distributed cache
+   * files and archives. The visibility of a cache path is "public" if the leaf
+   * component has READ permissions for others, and the parent subdirs have 
+   * EXECUTE permissions for others.
    * 
    * This is an internal method!
    * 
+   * @param job
+   * @throws IOException
+   */
+  public static void determineTimestampsAndCacheVisibilities(Configuration job)
+  throws IOException {
+    Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>();
+    determineTimestamps(job, statCache);
+    determineCacheVisibilities(job, statCache);
+  }
+  
+  /**
+   * Determines timestamps of files to be cached, and stores those
+   * in the configuration.
+   * 
    * @param job Configuration of a job.
+   * @param statCache a cache of FileStatuses so that redundant remote
+   *    calls can be avoided
    * @throws IOException
    */
-  public static void determineTimestamps(Configuration job) throws IOException 
{
+  static void determineTimestamps(Configuration job,
+      Map<URI, FileStatus> statCache) throws IOException {
     URI[] tarchives = DistributedCache.getCacheArchives(job);
     if (tarchives != null) {
-      FileStatus status = DistributedCache.getFileStatus(job, tarchives[0]);
+      FileStatus status = getFileStatus(job, tarchives[0], statCache);
       StringBuffer archiveFileSizes = 
         new StringBuffer(String.valueOf(status.getLen()));      
       StringBuffer archiveTimestamps = 
         new StringBuffer(String.valueOf(status.getModificationTime()));
       for (int i = 1; i < tarchives.length; i++) {
-        status = DistributedCache.getFileStatus(job, tarchives[i]);
+        status = getFileStatus(job, tarchives[i], statCache);
         archiveFileSizes.append(",");
         archiveFileSizes.append(String.valueOf(status.getLen()));
         archiveTimestamps.append(",");
@@ -744,7 +789,7 @@ public class TrackerDistributedCacheMana
   
     URI[] tfiles = DistributedCache.getCacheFiles(job);
     if (tfiles != null) {
-      FileStatus status = DistributedCache.getFileStatus(job, tfiles[0]);
+      FileStatus status = getFileStatus(job, tfiles[0], statCache);
       StringBuffer fileSizes = 
         new StringBuffer(String.valueOf(status.getLen()));      
       StringBuffer fileTimestamps = new StringBuffer(String.valueOf(
@@ -766,27 +811,29 @@ public class TrackerDistributedCacheMana
    * has READ permissions for others, and the parent subdirs have 
    * EXECUTE permissions for others
    * @param job
+   * @param statCache a cache of FileStatuses so that redundant remote
+   *    calls can be avoided
    * @throws IOException
    */
-  public static void determineCacheVisibilities(Configuration job) 
-  throws IOException {
+  static void determineCacheVisibilities(Configuration job,
+      Map<URI, FileStatus> statCache) throws IOException {
     URI[] tarchives = DistributedCache.getCacheArchives(job);
     if (tarchives != null) {
       StringBuffer archiveVisibilities = 
-        new StringBuffer(String.valueOf(isPublic(job, tarchives[0])));
+        new StringBuffer(String.valueOf(isPublic(job, tarchives[0], 
statCache)));
       for (int i = 1; i < tarchives.length; i++) {
         archiveVisibilities.append(",");
-        archiveVisibilities.append(String.valueOf(isPublic(job, 
tarchives[i])));
+        archiveVisibilities.append(String.valueOf(isPublic(job, tarchives[i], 
statCache)));
       }
       setArchiveVisibilities(job, archiveVisibilities.toString());
     }
     URI[] tfiles = DistributedCache.getCacheFiles(job);
     if (tfiles != null) {
       StringBuffer fileVisibilities = 
-        new StringBuffer(String.valueOf(isPublic(job, tfiles[0])));
+        new StringBuffer(String.valueOf(isPublic(job, tfiles[0], statCache)));
       for (int i = 1; i < tfiles.length; i++) {
         fileVisibilities.append(",");
-        fileVisibilities.append(String.valueOf(isPublic(job, tfiles[i])));
+        fileVisibilities.append(String.valueOf(isPublic(job, tfiles[i], 
statCache)));
       }
       setFileVisibilities(job, fileVisibilities.toString());
     }

Modified: 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1431168&r1=1431167&r2=1431168&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java
 Thu Jan 10 00:55:11 2013
@@ -827,10 +827,9 @@ public class JobClient extends Configure
     
     // First we check whether the cached archives and files are legal.
     TrackerDistributedCacheManager.validate(job);
-    //  set the timestamps of the archives and files
-    TrackerDistributedCacheManager.determineTimestamps(job);
-    //  set the public/private visibility of the archives and files
-    TrackerDistributedCacheManager.determineCacheVisibilities(job);
+    //  set the timestamps of the archives and files and set the
+    //  public/private visibility of the archives and files
+    
TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(job);
     // get DelegationTokens for cache files
     TrackerDistributedCacheManager.getDelegationTokens(job, 
                                                        job.getCredentials());

Modified: 
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1431168&r1=1431167&r2=1431168&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
 Thu Jan 10 00:55:11 2013
@@ -24,6 +24,8 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -108,7 +110,7 @@ public class TestTrackerDistributedCache
     assertTrue("Test root directory " + TEST_ROOT + " and all of its " +
                "parent directories must have a+x permissions",
                TrackerDistributedCacheManager.ancestorsHaveExecutePermissions(
-                 fs, new Path(TEST_ROOT.toString())));
+                 fs, new Path(TEST_ROOT.toString()), new HashMap<URI, 
FileStatus>()));
 
     // Prepare the tests' mapred-local-dir
     ROOT_MAPRED_LOCAL_DIR = new File(TEST_ROOT_DIR, "mapred/local");
@@ -187,8 +189,11 @@ public class TestTrackerDistributedCache
     DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
     DistributedCache.addFileToClassPath(secondCacheFile, subConf, 
                                         FileSystem.get(subConf));
-    TrackerDistributedCacheManager.determineTimestamps(subConf);
-    TrackerDistributedCacheManager.determineCacheVisibilities(subConf);
+
+    Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>();
+    TrackerDistributedCacheManager.determineTimestamps(subConf, statCache);
+    TrackerDistributedCacheManager.determineCacheVisibilities(subConf, 
statCache);
+    assertEquals(2, statCache.size());
     // ****** End of imitating JobClient code
 
     Path jobFile = new Path(TEST_ROOT_DIR, "job.xml");
@@ -273,8 +278,7 @@ public class TestTrackerDistributedCache
     conf1.set("user.name", userName);
     DistributedCache.addCacheFile(secondCacheFile.toUri(), conf1);
     
-    TrackerDistributedCacheManager.determineTimestamps(conf1);
-    TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
+    
TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf1);
 
     // Task localizing for first job
     JobID jobId = new JobID("jt", 1);
@@ -302,8 +306,7 @@ public class TestTrackerDistributedCache
     DistributedCache.addCacheFile(firstCacheDirPublic.toUri(), conf2);
     DistributedCache.addCacheFile(firstCacheDirPrivate.toUri(), conf2);
 
-    TrackerDistributedCacheManager.determineTimestamps(conf2);
-    TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
+    
TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf2);
 
     // Task localizing for second job
     JobID job2Id = new JobID("jt", 2);
@@ -339,8 +342,7 @@ public class TestTrackerDistributedCache
     // add a file that is never localized
     DistributedCache.addCacheFile(thirdCacheFile.toUri(), conf3);
     
-    TrackerDistributedCacheManager.determineTimestamps(conf3);
-    TrackerDistributedCacheManager.determineCacheVisibilities(conf3);
+    
TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf3);
 
     // Task localizing for third job
     // localization for the "firstCacheFile" will fail.
@@ -379,7 +381,7 @@ public class TestTrackerDistributedCache
    * @throws LoginException
    */
   public void testPublicPrivateCache() 
-  throws IOException, LoginException, InterruptedException {
+  throws IOException, LoginException, InterruptedException, URISyntaxException 
{
     if (!canRun()) {
       return;
     }
@@ -404,8 +406,7 @@ public class TestTrackerDistributedCache
 
     DistributedCache.addCacheFile(cacheFile.toUri(), conf1);
     DistributedCache.addCacheArchive(cacheFile.toUri(), conf1);
-    TrackerDistributedCacheManager.determineTimestamps(conf1);
-    TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
+    
TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf1);
     dumpState(conf1);
 
     TaskDistributedCacheManager handle = manager
@@ -491,7 +492,7 @@ public class TestTrackerDistributedCache
   }
   
   private void checkLocalizedPath(boolean visibility) 
-  throws IOException, LoginException, InterruptedException {
+  throws IOException, LoginException, InterruptedException, URISyntaxException 
{
     TrackerDistributedCacheManager manager = 
       new TrackerDistributedCacheManager(conf, taskController);
     String userName = getJobOwnerName();
@@ -506,8 +507,7 @@ public class TestTrackerDistributedCache
     Configuration conf1 = new Configuration(conf);
     conf1.set("user.name", userName);
     DistributedCache.addCacheFile(cacheFile.toUri(), conf1);
-    TrackerDistributedCacheManager.determineTimestamps(conf1);
-    TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
+    
TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf1);
     dumpState(conf1);
 
     // Task localizing for job
@@ -894,8 +894,7 @@ public class TestTrackerDistributedCache
     createPrivateTempFile(thirdCacheFile);
     createPrivateTempFile(fourthCacheFile);
     DistributedCache.setCacheFiles(new URI[]{thirdCacheFile.toUri()}, conf2);
-    TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
-    TrackerDistributedCacheManager.determineTimestamps(conf2);
+    
TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf2);
     stat = fs.getFileStatus(thirdCacheFile);
     CacheFile cfile3 = new CacheFile(thirdCacheFile.toUri(), 
             CacheFile.FileType.REGULAR, false, 
@@ -922,8 +921,7 @@ public class TestTrackerDistributedCache
     
     DistributedCache.setCacheFiles(new URI[]{fourthCacheFile.toUri()}, conf2);
     DistributedCache.setLocalFiles(conf2, thirdCacheFile.toUri().toString());
-    TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
-    TrackerDistributedCacheManager.determineTimestamps(conf2);
+    
TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf2);
     Path fourthLocalCache = manager.getLocalCache(fourthCacheFile.toUri(), 
conf2, 
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(fourthCacheFile), false, 
@@ -1100,8 +1098,7 @@ public class TestTrackerDistributedCache
     Configuration subConf = new Configuration(myConf);
     subConf.set("user.name", userName);
     DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
-    TrackerDistributedCacheManager.determineTimestamps(subConf);
-    TrackerDistributedCacheManager.determineCacheVisibilities(subConf);
+    
TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(subConf);
     // ****** End of imitating JobClient code
 
     // ****** Imitate TaskRunner code.
@@ -1150,8 +1147,7 @@ public class TestTrackerDistributedCache
     Configuration subConf2 = new Configuration(myConf);
     subConf2.set("user.name", userName);
     DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf2);
-    TrackerDistributedCacheManager.determineTimestamps(subConf2);
-    TrackerDistributedCacheManager.determineCacheVisibilities(subConf2);
+    
TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(subConf2);
     
     handle =
       manager.newTaskDistributedCacheManager(new JobID("jt", 2), subConf2);


Reply via email to