Author: tgraves Date: Tue Sep 11 14:34:27 2012 New Revision: 1383430 URL: http://svn.apache.org/viewvc?rev=1383430&view=rev Log: MAPREDUCE-4576. Large dist cache can block tasktracker heartbeat (Robert Evans via tgraves).
Modified: hadoop/common/branches/branch-1/CHANGES.txt hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java Modified: hadoop/common/branches/branch-1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1383430&r1=1383429&r2=1383430&view=diff ============================================================================== --- hadoop/common/branches/branch-1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1/CHANGES.txt Tue Sep 11 14:34:27 2012 @@ -233,6 +233,9 @@ Release 1.2.0 - unreleased HADOOP-8781. hadoop-config.sh should add JAVA_LIBRARY_PATH to LD_LIBRARY_PATH. (tucu) + MAPREDUCE-4576. Large dist cache can block tasktracker heartbeat + (Robert Evans via tgraves) + Release 1.1.0 - unreleased INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1383430&r1=1383429&r2=1383430&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java (original) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java Tue Sep 11 14:34:27 2012 @@ -35,6 +35,7 @@ import java.util.Random; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -566,7 +567,8 @@ public class TrackerDistributedCacheMana // long size; //the size of this cache. boolean inited = false; // is it initialized ? - + private final ReentrantLock lock = new ReentrantLock(); + // // The following five fields are Immutable. // @@ -598,14 +600,20 @@ public class TrackerDistributedCacheMana this.key = key; } - public synchronized void incRefCount() { - refcount.incrementAndGet() ; - LOG.debug(localizedLoadPath + ": refcount=" + refcount.get()); + public void incRefCount() { + lock.lock(); + try { + refcount.incrementAndGet() ; + LOG.debug(localizedLoadPath + ": refcount=" + refcount.get()); + } finally { + lock.unlock(); + } } public void decRefCount() { synchronized (cachedArchives) { - synchronized (this) { + lock.lock(); + try { refcount.decrementAndGet() ; LOG.debug(localizedLoadPath + ": refcount=" + refcount.get()); if(refcount.get() <= 0) { @@ -613,6 +621,8 @@ public class TrackerDistributedCacheMana cachedArchives.remove(key); cachedArchives.put(key, this); } + } finally { + lock.unlock(); } } } @@ -621,9 +631,14 @@ public class TrackerDistributedCacheMana return refcount.get(); } - public synchronized boolean isUsed() { - LOG.debug(localizedLoadPath + ": refcount=" + refcount.get()); - return refcount.get() > 0; + public boolean isUsed() { + lock.lock(); + try { + LOG.debug(localizedLoadPath + ": refcount=" + refcount.get()); + return refcount.get() > 0; + } finally { + lock.unlock(); + } } Path getBaseDir(){ @@ -1027,19 +1042,24 @@ public class TrackerDistributedCacheMana CacheDir leftToClean = toBeCleanedBaseDir.get(cacheStatus.getBaseDir()); if (leftToClean != null && (leftToClean.size > 0 || leftToClean.subdirs > 0)) { - synchronized (cacheStatus) { - // if reference count is zero mark the cache for deletion - boolean isUsed = cacheStatus.isUsed(); - long cacheSize = cacheStatus.size; - LOG.debug(cacheStatus.getLocalizedUniqueDir() + ": isUsed=" + isUsed + - " size=" + cacheSize + " leftToClean.size=" + leftToClean.size); - if (!isUsed) { - leftToClean.size -= cacheSize; - leftToClean.subdirs--; - // delete this cache entry from the global list - // and mark the localized file for deletion - toBeDeletedCache.add(cacheStatus); - it.remove(); + boolean gotLock = cacheStatus.lock.tryLock(); + if (gotLock) { + try { + // if reference count is zero mark the cache for deletion + boolean isUsed = cacheStatus.isUsed(); + long cacheSize = cacheStatus.size; + LOG.debug(cacheStatus.getLocalizedUniqueDir() + ": isUsed=" + isUsed + + " size=" + cacheSize + " leftToClean.size=" + leftToClean.size); + if (!isUsed) { + leftToClean.size -= cacheSize; + leftToClean.subdirs--; + // delete this cache entry from the global list + // and mark the localized file for deletion + toBeDeletedCache.add(cacheStatus); + it.remove(); + } + } finally { + cacheStatus.lock.unlock(); } } } @@ -1048,7 +1068,8 @@ public class TrackerDistributedCacheMana // do the deletion, after releasing the global lock for (CacheStatus cacheStatus : toBeDeletedCache) { - synchronized (cacheStatus) { + cacheStatus.lock.lock(); + try { Path localizedDir = cacheStatus.getLocalizedUniqueDir(); if (cacheStatus.user == null) { TrackerDistributedCacheManager.LOG.info("Deleted path " + localizedDir); @@ -1067,6 +1088,8 @@ public class TrackerDistributedCacheMana taskController.deleteAsUser(cacheStatus.user, relative); } deleteCacheInfoUpdate(cacheStatus); + } finally { + cacheStatus.lock.unlock(); } } }