Repository: hadoop Updated Branches: refs/heads/trunk 9560f252c -> ce9c00643
YARN-5214. Fixed locking in DirectoryCollection to avoid hanging NMs when various code-paths hit slow disks. Contributed by Junping Du. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ce9c0064 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ce9c0064 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ce9c0064 Branch: refs/heads/trunk Commit: ce9c006430d13a28bc1ca57c5c70cc1b7cba1692 Parents: 9560f25 Author: Vinod Kumar Vavilapalli <vino...@apache.org> Authored: Tue Jul 5 16:07:28 2016 -0700 Committer: Vinod Kumar Vavilapalli <vino...@apache.org> Committed: Tue Jul 5 16:07:28 2016 -0700 ---------------------------------------------------------------------- .../server/nodemanager/DirectoryCollection.java | 221 ++++++++++++------- 1 file changed, 145 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce9c0064/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java index a2bfd20..417b207 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java @@ -28,12 +28,17 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -89,6 +94,10 @@ public class DirectoryCollection { private List<String> errorDirs; private List<String> fullDirs; + // read/write lock for accessing above directories. + private final ReadLock readLock; + private final WriteLock writeLock; + private int numFailures; private float diskUtilizationPercentageCutoffHigh; @@ -163,9 +172,13 @@ public class DirectoryCollection { float utilizationPercentageCutOffHigh, float utilizationPercentageCutOffLow, long utilizationSpaceCutOff) { - localDirs = new CopyOnWriteArrayList<String>(dirs); - errorDirs = new CopyOnWriteArrayList<String>(); - fullDirs = new CopyOnWriteArrayList<String>(); + localDirs = new CopyOnWriteArrayList<>(dirs); + errorDirs = new CopyOnWriteArrayList<>(); + fullDirs = new CopyOnWriteArrayList<>(); + + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + this.readLock = lock.readLock(); + this.writeLock = lock.writeLock(); diskUtilizationPercentageCutoffHigh = Math.max(0.0F, Math.min(100.0F, utilizationPercentageCutOffHigh)); @@ -174,17 +187,18 @@ public class DirectoryCollection { diskUtilizationSpaceCutoff = utilizationSpaceCutOff < 0 ? 0 : utilizationSpaceCutOff; - dirsChangeListeners = new HashSet<DirsChangeListener>(); + dirsChangeListeners = Collections.newSetFromMap( + new ConcurrentHashMap<DirsChangeListener, Boolean>()); } - synchronized void registerDirsChangeListener( + void registerDirsChangeListener( DirsChangeListener listener) { if (dirsChangeListeners.add(listener)) { listener.onDirsChanged(); } } - synchronized void deregisterDirsChangeListener( + void deregisterDirsChangeListener( DirsChangeListener listener) { dirsChangeListeners.remove(listener); } @@ -192,31 +206,51 @@ public class DirectoryCollection { /** * @return the current valid directories */ - synchronized List<String> getGoodDirs() { - return Collections.unmodifiableList(localDirs); + List<String> getGoodDirs() { + this.readLock.lock(); + try { + return Collections.unmodifiableList(localDirs); + } finally { + this.readLock.unlock(); + } } /** * @return the failed directories */ - synchronized List<String> getFailedDirs() { - return Collections.unmodifiableList( - DirectoryCollection.concat(errorDirs, fullDirs)); + List<String> getFailedDirs() { + this.readLock.lock(); + try { + return Collections.unmodifiableList( + DirectoryCollection.concat(errorDirs, fullDirs)); + } finally { + this.readLock.unlock(); + } } /** * @return the directories that have used all disk space */ - synchronized List<String> getFullDirs() { - return fullDirs; + List<String> getFullDirs() { + this.readLock.lock(); + try { + return fullDirs; + } finally { + this.readLock.unlock(); + } } /** * @return total the number of directory failures seen till now */ - synchronized int getNumFailures() { - return numFailures; + int getNumFailures() { + this.readLock.lock(); + try { + return numFailures; + }finally { + this.readLock.unlock(); + } } /** @@ -226,18 +260,30 @@ public class DirectoryCollection { * @param perm absolute permissions to use for any directories created * @return true if there were no errors, false if at least one error occurred */ - synchronized boolean createNonExistentDirs(FileContext localFs, + boolean createNonExistentDirs(FileContext localFs, FsPermission perm) { boolean failed = false; - for (final String dir : localDirs) { + List<String> localDirectories = null; + this.readLock.lock(); + try { + localDirectories = new ArrayList<>(localDirs); + } finally { + this.readLock.unlock(); + } + for (final String dir : localDirectories) { try { createDir(localFs, new Path(dir), perm); } catch (IOException e) { LOG.warn("Unable to create directory " + dir + " error " + e.getMessage() + ", removing from the list of valid directories."); - localDirs.remove(dir); - errorDirs.add(dir); - numFailures++; + this.writeLock.lock(); + try { + localDirs.remove(dir); + errorDirs.add(dir); + numFailures++; + } finally { + this.writeLock.unlock(); + } failed = true; } } @@ -252,74 +298,93 @@ public class DirectoryCollection { * checking or a failed directory passes the disk check <em>false</em> * otherwise. */ - synchronized boolean checkDirs() { + boolean checkDirs() { boolean setChanged = false; - Set<String> preCheckGoodDirs = new HashSet<String>(localDirs); - Set<String> preCheckFullDirs = new HashSet<String>(fullDirs); - Set<String> preCheckOtherErrorDirs = new HashSet<String>(errorDirs); - List<String> failedDirs = DirectoryCollection.concat(errorDirs, fullDirs); - List<String> allLocalDirs = - DirectoryCollection.concat(localDirs, failedDirs); + Set<String> preCheckGoodDirs = null; + Set<String> preCheckFullDirs = null; + Set<String> preCheckOtherErrorDirs = null; + List<String> failedDirs = null; + List<String> allLocalDirs = null; + this.readLock.lock(); + try { + preCheckGoodDirs = new HashSet<String>(localDirs); + preCheckFullDirs = new HashSet<String>(fullDirs); + preCheckOtherErrorDirs = new HashSet<String>(errorDirs); + failedDirs = DirectoryCollection.concat(errorDirs, fullDirs); + allLocalDirs = DirectoryCollection.concat(localDirs, failedDirs); + } finally { + this.readLock.unlock(); + } + // move testDirs out of any lock as it could wait for very long time in + // case of busy IO Map<String, DiskErrorInformation> dirsFailedCheck = testDirs(allLocalDirs, preCheckGoodDirs); - localDirs.clear(); - errorDirs.clear(); - fullDirs.clear(); - - for (Map.Entry<String, DiskErrorInformation> entry : dirsFailedCheck - .entrySet()) { - String dir = entry.getKey(); - DiskErrorInformation errorInformation = entry.getValue(); - switch (entry.getValue().cause) { - case DISK_FULL: - fullDirs.add(entry.getKey()); - break; - case OTHER: - errorDirs.add(entry.getKey()); - break; - } - if (preCheckGoodDirs.contains(dir)) { - LOG.warn("Directory " + dir + " error, " + errorInformation.message - + ", removing from list of valid directories"); - setChanged = true; - numFailures++; - } - } - for (String dir : allLocalDirs) { - if (!dirsFailedCheck.containsKey(dir)) { - localDirs.add(dir); - if (preCheckFullDirs.contains(dir) - || preCheckOtherErrorDirs.contains(dir)) { + this.writeLock.lock(); + try { + localDirs.clear(); + errorDirs.clear(); + fullDirs.clear(); + + for (Map.Entry<String, DiskErrorInformation> entry : dirsFailedCheck + .entrySet()) { + String dir = entry.getKey(); + DiskErrorInformation errorInformation = entry.getValue(); + switch (entry.getValue().cause) { + case DISK_FULL: + fullDirs.add(entry.getKey()); + break; + case OTHER: + errorDirs.add(entry.getKey()); + break; + default: + LOG.warn(entry.getValue().cause + " is unknown for disk error."); + break; + } + if (preCheckGoodDirs.contains(dir)) { + LOG.warn("Directory " + dir + " error, " + errorInformation.message + + ", removing from list of valid directories"); setChanged = true; - LOG.info("Directory " + dir - + " passed disk check, adding to list of valid directories."); + numFailures++; } } - } - Set<String> postCheckFullDirs = new HashSet<String>(fullDirs); - Set<String> postCheckOtherDirs = new HashSet<String>(errorDirs); - for (String dir : preCheckFullDirs) { - if (postCheckOtherDirs.contains(dir)) { - LOG.warn("Directory " + dir + " error " - + dirsFailedCheck.get(dir).message); + for (String dir : allLocalDirs) { + if (!dirsFailedCheck.containsKey(dir)) { + localDirs.add(dir); + if (preCheckFullDirs.contains(dir) + || preCheckOtherErrorDirs.contains(dir)) { + setChanged = true; + LOG.info("Directory " + dir + + " passed disk check, adding to list of valid directories."); + } + } + } + Set<String> postCheckFullDirs = new HashSet<String>(fullDirs); + Set<String> postCheckOtherDirs = new HashSet<String>(errorDirs); + for (String dir : preCheckFullDirs) { + if (postCheckOtherDirs.contains(dir)) { + LOG.warn("Directory " + dir + " error " + + dirsFailedCheck.get(dir).message); + } } - } - for (String dir : preCheckOtherErrorDirs) { - if (postCheckFullDirs.contains(dir)) { - LOG.warn("Directory " + dir + " error " - + dirsFailedCheck.get(dir).message); + for (String dir : preCheckOtherErrorDirs) { + if (postCheckFullDirs.contains(dir)) { + LOG.warn("Directory " + dir + " error " + + dirsFailedCheck.get(dir).message); + } } - } - setGoodDirsDiskUtilizationPercentage(); - if (setChanged) { - for (DirsChangeListener listener : dirsChangeListeners) { - listener.onDirsChanged(); + setGoodDirsDiskUtilizationPercentage(); + if (setChanged) { + for (DirsChangeListener listener : dirsChangeListeners) { + listener.onDirsChanged(); + } } + return setChanged; + } finally { + this.writeLock.unlock(); } - return setChanged; } Map<String, DiskErrorInformation> testDirs(List<String> dirs, @@ -409,7 +474,11 @@ public class DirectoryCollection { localFs.getFileStatus(dir); } catch (FileNotFoundException e) { createDir(localFs, dir.getParent(), perm); - localFs.mkdir(dir, perm, false); + try { + localFs.mkdir(dir, perm, false); + } catch (FileAlreadyExistsException ex) { + // do nothing as other threads could in creating the same directory. + } if (!perm.equals(perm.applyUMask(localFs.getUMask()))) { localFs.setPermission(dir, perm); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org