Repository: hadoop
Updated Branches:
  refs/heads/trunk 9560f252c -> ce9c00643


YARN-5214. Fixed locking in DirectoryCollection to avoid hanging NMs when 
various code-paths hit slow disks. Contributed by Junping Du.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ce9c0064
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ce9c0064
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ce9c0064

Branch: refs/heads/trunk
Commit: ce9c006430d13a28bc1ca57c5c70cc1b7cba1692
Parents: 9560f25
Author: Vinod Kumar Vavilapalli <vino...@apache.org>
Authored: Tue Jul 5 16:07:28 2016 -0700
Committer: Vinod Kumar Vavilapalli <vino...@apache.org>
Committed: Tue Jul 5 16:07:28 2016 -0700

----------------------------------------------------------------------
 .../server/nodemanager/DirectoryCollection.java | 221 ++++++++++++-------
 1 file changed, 145 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce9c0064/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java
index a2bfd20..417b207 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java
@@ -28,12 +28,17 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -89,6 +94,10 @@ public class DirectoryCollection {
   private List<String> errorDirs;
   private List<String> fullDirs;
 
+  // read/write lock for accessing above directories.
+  private final ReadLock readLock;
+  private final WriteLock writeLock;
+
   private int numFailures;
 
   private float diskUtilizationPercentageCutoffHigh;
@@ -163,9 +172,13 @@ public class DirectoryCollection {
       float utilizationPercentageCutOffHigh,
       float utilizationPercentageCutOffLow,
       long utilizationSpaceCutOff) {
-    localDirs = new CopyOnWriteArrayList<String>(dirs);
-    errorDirs = new CopyOnWriteArrayList<String>();
-    fullDirs = new CopyOnWriteArrayList<String>();
+    localDirs = new CopyOnWriteArrayList<>(dirs);
+    errorDirs = new CopyOnWriteArrayList<>();
+    fullDirs = new CopyOnWriteArrayList<>();
+
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    this.readLock = lock.readLock();
+    this.writeLock = lock.writeLock();
 
     diskUtilizationPercentageCutoffHigh = Math.max(0.0F, Math.min(100.0F,
         utilizationPercentageCutOffHigh));
@@ -174,17 +187,18 @@ public class DirectoryCollection {
     diskUtilizationSpaceCutoff =
         utilizationSpaceCutOff < 0 ? 0 : utilizationSpaceCutOff;
 
-    dirsChangeListeners = new HashSet<DirsChangeListener>();
+    dirsChangeListeners = Collections.newSetFromMap(
+        new ConcurrentHashMap<DirsChangeListener, Boolean>());
   }
 
-  synchronized void registerDirsChangeListener(
+  void registerDirsChangeListener(
       DirsChangeListener listener) {
     if (dirsChangeListeners.add(listener)) {
       listener.onDirsChanged();
     }
   }
 
-  synchronized void deregisterDirsChangeListener(
+  void deregisterDirsChangeListener(
       DirsChangeListener listener) {
     dirsChangeListeners.remove(listener);
   }
@@ -192,31 +206,51 @@ public class DirectoryCollection {
   /**
    * @return the current valid directories 
    */
-  synchronized List<String> getGoodDirs() {
-    return Collections.unmodifiableList(localDirs);
+  List<String> getGoodDirs() {
+    this.readLock.lock();
+    try {
+      return Collections.unmodifiableList(localDirs);
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
   /**
    * @return the failed directories
    */
-  synchronized List<String> getFailedDirs() {
-    return Collections.unmodifiableList(
-        DirectoryCollection.concat(errorDirs, fullDirs));
+  List<String> getFailedDirs() {
+    this.readLock.lock();
+    try {
+      return Collections.unmodifiableList(
+          DirectoryCollection.concat(errorDirs, fullDirs));
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
   /**
    * @return the directories that have used all disk space
    */
 
-  synchronized List<String> getFullDirs() {
-    return fullDirs;
+  List<String> getFullDirs() {
+    this.readLock.lock();
+    try {
+      return fullDirs;
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
   /**
    * @return total the number of directory failures seen till now
    */
-  synchronized int getNumFailures() {
-    return numFailures;
+  int getNumFailures() {
+    this.readLock.lock();
+    try {
+      return numFailures;
+    }finally {
+      this.readLock.unlock();
+    }
   }
 
   /**
@@ -226,18 +260,30 @@ public class DirectoryCollection {
    * @param perm absolute permissions to use for any directories created
    * @return true if there were no errors, false if at least one error occurred
    */
-  synchronized boolean createNonExistentDirs(FileContext localFs,
+  boolean createNonExistentDirs(FileContext localFs,
       FsPermission perm) {
     boolean failed = false;
-    for (final String dir : localDirs) {
+    List<String> localDirectories = null;
+    this.readLock.lock();
+    try {
+      localDirectories = new ArrayList<>(localDirs);
+    } finally {
+      this.readLock.unlock();
+    }
+    for (final String dir : localDirectories) {
       try {
         createDir(localFs, new Path(dir), perm);
       } catch (IOException e) {
         LOG.warn("Unable to create directory " + dir + " error " +
             e.getMessage() + ", removing from the list of valid directories.");
-        localDirs.remove(dir);
-        errorDirs.add(dir);
-        numFailures++;
+        this.writeLock.lock();
+        try {
+          localDirs.remove(dir);
+          errorDirs.add(dir);
+          numFailures++;
+        } finally {
+          this.writeLock.unlock();
+        }
         failed = true;
       }
     }
@@ -252,74 +298,93 @@ public class DirectoryCollection {
    *         checking or a failed directory passes the disk check 
<em>false</em>
    *         otherwise.
    */
-  synchronized boolean checkDirs() {
+  boolean checkDirs() {
     boolean setChanged = false;
-    Set<String> preCheckGoodDirs = new HashSet<String>(localDirs);
-    Set<String> preCheckFullDirs = new HashSet<String>(fullDirs);
-    Set<String> preCheckOtherErrorDirs = new HashSet<String>(errorDirs);
-    List<String> failedDirs = DirectoryCollection.concat(errorDirs, fullDirs);
-    List<String> allLocalDirs =
-        DirectoryCollection.concat(localDirs, failedDirs);
+    Set<String> preCheckGoodDirs = null;
+    Set<String> preCheckFullDirs = null;
+    Set<String> preCheckOtherErrorDirs = null;
+    List<String> failedDirs = null;
+    List<String> allLocalDirs = null;
+    this.readLock.lock();
+    try {
+      preCheckGoodDirs = new HashSet<String>(localDirs);
+      preCheckFullDirs = new HashSet<String>(fullDirs);
+      preCheckOtherErrorDirs = new HashSet<String>(errorDirs);
+      failedDirs = DirectoryCollection.concat(errorDirs, fullDirs);
+      allLocalDirs = DirectoryCollection.concat(localDirs, failedDirs);
+    } finally {
+      this.readLock.unlock();
+    }
 
+    // move testDirs out of any lock as it could wait for very long time in
+    // case of busy IO
     Map<String, DiskErrorInformation> dirsFailedCheck = testDirs(allLocalDirs,
         preCheckGoodDirs);
 
-    localDirs.clear();
-    errorDirs.clear();
-    fullDirs.clear();
-
-    for (Map.Entry<String, DiskErrorInformation> entry : dirsFailedCheck
-      .entrySet()) {
-      String dir = entry.getKey();
-      DiskErrorInformation errorInformation = entry.getValue();
-      switch (entry.getValue().cause) {
-      case DISK_FULL:
-        fullDirs.add(entry.getKey());
-        break;
-      case OTHER:
-        errorDirs.add(entry.getKey());
-        break;
-      }
-      if (preCheckGoodDirs.contains(dir)) {
-        LOG.warn("Directory " + dir + " error, " + errorInformation.message
-            + ", removing from list of valid directories");
-        setChanged = true;
-        numFailures++;
-      }
-    }
-    for (String dir : allLocalDirs) {
-      if (!dirsFailedCheck.containsKey(dir)) {
-        localDirs.add(dir);
-        if (preCheckFullDirs.contains(dir)
-            || preCheckOtherErrorDirs.contains(dir)) {
+    this.writeLock.lock();
+    try {
+      localDirs.clear();
+      errorDirs.clear();
+      fullDirs.clear();
+
+      for (Map.Entry<String, DiskErrorInformation> entry : dirsFailedCheck
+          .entrySet()) {
+        String dir = entry.getKey();
+        DiskErrorInformation errorInformation = entry.getValue();
+        switch (entry.getValue().cause) {
+        case DISK_FULL:
+          fullDirs.add(entry.getKey());
+          break;
+        case OTHER:
+          errorDirs.add(entry.getKey());
+          break;
+        default:
+          LOG.warn(entry.getValue().cause + " is unknown for disk error.");
+          break;
+        }
+        if (preCheckGoodDirs.contains(dir)) {
+          LOG.warn("Directory " + dir + " error, " + errorInformation.message
+              + ", removing from list of valid directories");
           setChanged = true;
-          LOG.info("Directory " + dir
-              + " passed disk check, adding to list of valid directories.");
+          numFailures++;
         }
       }
-    }
-    Set<String> postCheckFullDirs = new HashSet<String>(fullDirs);
-    Set<String> postCheckOtherDirs = new HashSet<String>(errorDirs);
-    for (String dir : preCheckFullDirs) {
-      if (postCheckOtherDirs.contains(dir)) {
-        LOG.warn("Directory " + dir + " error "
-            + dirsFailedCheck.get(dir).message);
+      for (String dir : allLocalDirs) {
+        if (!dirsFailedCheck.containsKey(dir)) {
+          localDirs.add(dir);
+          if (preCheckFullDirs.contains(dir)
+              || preCheckOtherErrorDirs.contains(dir)) {
+            setChanged = true;
+            LOG.info("Directory " + dir
+                + " passed disk check, adding to list of valid directories.");
+          }
+        }
+      }
+      Set<String> postCheckFullDirs = new HashSet<String>(fullDirs);
+      Set<String> postCheckOtherDirs = new HashSet<String>(errorDirs);
+      for (String dir : preCheckFullDirs) {
+        if (postCheckOtherDirs.contains(dir)) {
+          LOG.warn("Directory " + dir + " error "
+              + dirsFailedCheck.get(dir).message);
+        }
       }
-    }
 
-    for (String dir : preCheckOtherErrorDirs) {
-      if (postCheckFullDirs.contains(dir)) {
-        LOG.warn("Directory " + dir + " error "
-            + dirsFailedCheck.get(dir).message);
+      for (String dir : preCheckOtherErrorDirs) {
+        if (postCheckFullDirs.contains(dir)) {
+          LOG.warn("Directory " + dir + " error "
+              + dirsFailedCheck.get(dir).message);
+        }
       }
-    }
-    setGoodDirsDiskUtilizationPercentage();
-    if (setChanged) {
-      for (DirsChangeListener listener : dirsChangeListeners) {
-        listener.onDirsChanged();
+      setGoodDirsDiskUtilizationPercentage();
+      if (setChanged) {
+        for (DirsChangeListener listener : dirsChangeListeners) {
+          listener.onDirsChanged();
+        }
       }
+      return setChanged;
+    } finally {
+      this.writeLock.unlock();
     }
-    return setChanged;
   }
 
   Map<String, DiskErrorInformation> testDirs(List<String> dirs,
@@ -409,7 +474,11 @@ public class DirectoryCollection {
       localFs.getFileStatus(dir);
     } catch (FileNotFoundException e) {
       createDir(localFs, dir.getParent(), perm);
-      localFs.mkdir(dir, perm, false);
+      try {
+        localFs.mkdir(dir, perm, false);
+      } catch (FileAlreadyExistsException ex) {
+        // do nothing as other threads could in creating the same directory.
+      }
       if (!perm.equals(perm.applyUMask(localFs.getUMask()))) {
         localFs.setPermission(dir, perm);
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to