Author: tgraves
Date: Tue Sep 11 14:34:27 2012
New Revision: 1383430

URL: http://svn.apache.org/viewvc?rev=1383430&view=rev
Log:
MAPREDUCE-4576. Large dist cache can block tasktracker heartbeat (Robert Evans 
via tgraves).

Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1383430&r1=1383429&r2=1383430&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Tue Sep 11 14:34:27 2012
@@ -233,6 +233,9 @@ Release 1.2.0 - unreleased
 
     HADOOP-8781. hadoop-config.sh should add JAVA_LIBRARY_PATH to 
LD_LIBRARY_PATH. (tucu)
 
+    MAPREDUCE-4576. Large dist cache can block tasktracker heartbeat
+    (Robert Evans via tgraves)
+
 Release 1.1.0 - unreleased
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1383430&r1=1383429&r2=1383430&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
 Tue Sep 11 14:34:27 2012
@@ -35,6 +35,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -566,7 +567,8 @@ public class TrackerDistributedCacheMana
     //
     long size;              //the size of this cache.
     boolean inited = false; // is it initialized ?
-    
+    private final ReentrantLock lock = new ReentrantLock();
+ 
     //
     // The following five fields are Immutable.
     //
@@ -598,14 +600,20 @@ public class TrackerDistributedCacheMana
       this.key = key;
     }
     
-    public synchronized void incRefCount() {
-      refcount.incrementAndGet() ;
-      LOG.debug(localizedLoadPath + ": refcount=" + refcount.get());
+    public void incRefCount() {
+      lock.lock();
+      try {
+        refcount.incrementAndGet() ;
+        LOG.debug(localizedLoadPath + ": refcount=" + refcount.get());
+      } finally {
+        lock.unlock();
+      }
     }
 
     public void decRefCount() {
       synchronized (cachedArchives) {
-        synchronized (this) {
+        lock.lock();
+        try {
           refcount.decrementAndGet() ;
           LOG.debug(localizedLoadPath + ": refcount=" + refcount.get());
           if(refcount.get() <= 0) {
@@ -613,6 +621,8 @@ public class TrackerDistributedCacheMana
             cachedArchives.remove(key);
             cachedArchives.put(key, this);
           }
+        } finally {
+          lock.unlock();
         }
       }
     }
@@ -621,9 +631,14 @@ public class TrackerDistributedCacheMana
       return refcount.get();
     }
 
-    public synchronized boolean isUsed() {
-      LOG.debug(localizedLoadPath + ": refcount=" + refcount.get());
-      return refcount.get() > 0;
+    public boolean isUsed() {
+      lock.lock();
+      try { 
+        LOG.debug(localizedLoadPath + ": refcount=" + refcount.get());
+        return refcount.get() > 0;
+      } finally {
+        lock.unlock();
+      }
     }
 
     Path getBaseDir(){
@@ -1027,19 +1042,24 @@ public class TrackerDistributedCacheMana
           CacheDir leftToClean = 
toBeCleanedBaseDir.get(cacheStatus.getBaseDir());
 
           if (leftToClean != null && (leftToClean.size > 0 || 
leftToClean.subdirs > 0)) {
-            synchronized (cacheStatus) {
-              // if reference count is zero mark the cache for deletion
-              boolean isUsed = cacheStatus.isUsed();
-              long cacheSize = cacheStatus.size; 
-              LOG.debug(cacheStatus.getLocalizedUniqueDir() + ": isUsed=" + 
isUsed + 
-                  " size=" + cacheSize + " leftToClean.size=" + 
leftToClean.size);
-              if (!isUsed) {
-                leftToClean.size -= cacheSize;
-                leftToClean.subdirs--;
-                // delete this cache entry from the global list 
-                // and mark the localized file for deletion
-                toBeDeletedCache.add(cacheStatus);
-                it.remove();
+            boolean gotLock = cacheStatus.lock.tryLock();
+            if (gotLock) {
+              try {
+                // if reference count is zero mark the cache for deletion
+                boolean isUsed = cacheStatus.isUsed();
+                long cacheSize = cacheStatus.size; 
+                LOG.debug(cacheStatus.getLocalizedUniqueDir() + ": isUsed=" + 
isUsed + 
+                    " size=" + cacheSize + " leftToClean.size=" + 
leftToClean.size);
+                if (!isUsed) {
+                  leftToClean.size -= cacheSize;
+                  leftToClean.subdirs--;
+                  // delete this cache entry from the global list 
+                  // and mark the localized file for deletion
+                  toBeDeletedCache.add(cacheStatus);
+                  it.remove();
+                }
+              } finally {
+                cacheStatus.lock.unlock();
               }
             }
           }
@@ -1048,7 +1068,8 @@ public class TrackerDistributedCacheMana
       
       // do the deletion, after releasing the global lock
       for (CacheStatus cacheStatus : toBeDeletedCache) {
-        synchronized (cacheStatus) {
+        cacheStatus.lock.lock();
+        try {
           Path localizedDir = cacheStatus.getLocalizedUniqueDir();
           if (cacheStatus.user == null) {
             TrackerDistributedCacheManager.LOG.info("Deleted path " + 
localizedDir);
@@ -1067,6 +1088,8 @@ public class TrackerDistributedCacheMana
             taskController.deleteAsUser(cacheStatus.user, relative);
           }
           deleteCacheInfoUpdate(cacheStatus);
+        } finally {
+          cacheStatus.lock.unlock();
         }
       }
     }


Reply via email to