This is an automated email from the ASF dual-hosted git repository.

slfan1989 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 75ad60ba99bd YARN-11191. Fix potentional deadlock in GlobalScheduler 
refreshQueues (#6732) (#6769) Contributed by Tamas Domok
75ad60ba99bd is described below

commit 75ad60ba99bdb4bd044879af20cab4f368866613
Author: Tamas Domok <tdo...@cloudera.com>
AuthorDate: Sat Apr 27 14:32:28 2024 +0200

    YARN-11191. Fix potentional deadlock in GlobalScheduler refreshQueues 
(#6732) (#6769) Contributed by Tamas Domok
    
    Signed-off-by: Shilun Fan <slfan1...@apache.org>
---
 .../scheduler/capacity/CSQueue.java                |  6 ++
 .../scheduler/capacity/LeafQueue.java              |  7 +-
 .../scheduler/capacity/ParentQueue.java            | 13 ++++
 .../capacity/preemption/PreemptionManager.java     |  6 +-
 .../scheduler/capacity/TestCapacityScheduler.java  | 76 ++++++++++++++++++++++
 5 files changed, 105 insertions(+), 3 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 77d8fa9c7ce6..0f9abef7663b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -157,6 +157,12 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
    * @return child queues
    */
   public List<CSQueue> getChildQueues();
+
+  /**
+   * Get child queues By tryLock.
+   * @return child queues
+   */
+  List<CSQueue> getChildQueuesByTryLock();
   
   /**
    * Check if the <code>user</code> has permission to perform the operation
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index ccf33632efd7..c0423a015c2d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -378,7 +378,12 @@ public class LeafQueue extends AbstractCSQueue {
   public List<CSQueue> getChildQueues() {
     return null;
   }
-  
+
+  @Override
+  public List<CSQueue> getChildQueuesByTryLock() {
+    return null;
+  }
+
   /**
    * Set user limit - used only for testing.
    * @param userLimit new user limit
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 1f31d8b0a0da..7c7ce1de0bab 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.locks.LockSupport;
 
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -1160,6 +1161,18 @@ public class ParentQueue extends AbstractCSQueue {
     }
 
   }
+
+  @Override
+  public List<CSQueue> getChildQueuesByTryLock() {
+    try {
+      while (!readLock.tryLock()) {
+        LockSupport.parkNanos(10000);
+      }
+      return new ArrayList<>(childQueues);
+    } finally {
+      readLock.unlock();
+    }
+  }
   
   @Override
   public void recoverContainer(Resource clusterResource,
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
index 47ae2ada22e6..440841ba44b2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -55,8 +56,9 @@ public class PreemptionManager {
             new PreemptableQueue(parentEntity));
       }
 
-      if (current.getChildQueues() != null) {
-        for (CSQueue child : current.getChildQueues()) {
+      List<CSQueue> childQueues = current.getChildQueuesByTryLock();
+      if (childQueues != null) {
+        for (CSQueue child : childQueues) {
           refreshQueues(current, child);
         }
       }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 8bc18481cb29..53b6b001e26c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -56,6 +56,7 @@ import java.util.concurrent.CyclicBarrier;
 import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
 import org.apache.hadoop.service.ServiceStateException;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -6053,4 +6054,79 @@ public class TestCapacityScheduler extends 
CapacitySchedulerTestBase {
     Assert.assertEquals(0, desQueue.getUsedResources().getMemorySize());
     rm1.close();
   }
+
+  /**
+   * (YARN-11191) This test ensures that no deadlock happens while the
+   * refreshQueues is called on the preemptionManager (refresh thread) and the
+   * AbstractCSQueue.getTotalKillableResource is called from the schedule 
thread.
+   *
+   * @throws Exception TestTimedOutException means deadlock
+   */
+  @Test (timeout = 20000)
+  public void testRefreshQueueWithOpenPreemption() throws Exception {
+    CapacitySchedulerConfiguration csConf = new 
CapacitySchedulerConfiguration();
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{"a"});
+    csConf.setCapacity("root.a", 100);
+    csConf.setQueues("root.a", new String[]{"b"});
+    csConf.setCapacity("root.a.b", 100);
+
+    YarnConfiguration conf = new YarnConfiguration(csConf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+    try (MockRM rm = new MockRM(csConf)) {
+      CapacityScheduler scheduler = (CapacityScheduler) 
rm.getResourceScheduler();
+      PreemptionManager preemptionManager = scheduler.getPreemptionManager();
+      rm.getRMContext().setNodeLabelManager(mgr);
+      rm.start();
+
+      ParentQueue queue = (ParentQueue) scheduler.getQueue("a");
+
+      // The scheduler thread holds the queue's read-lock for 5 seconds
+      // then the preemption's read-lock is used
+      Thread schedulerThread = new Thread(() -> {
+        queue.readLock.lock();
+        try {
+          Thread.sleep(5 * 1000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+        preemptionManager.getKillableContainers("a",
+            queue.getDefaultNodeLabelExpression());
+        queue.readLock.unlock();
+      }, "SCHEDULE");
+
+      // The complete thread locks/unlocks the queue's write-lock after 1 
seconds
+      Thread completeThread = new Thread(() -> {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+        queue.writeLock.lock();
+        queue.writeLock.unlock();
+      }, "COMPLETE");
+
+
+      // The refresh thread holds the preemption's write-lock after 2 seconds
+      // while it calls the getChildQueues(ByTryLock) that
+      // locks(tryLocks) the queue's read-lock
+      Thread refreshThread = new Thread(() -> {
+        try {
+          Thread.sleep(2 * 1000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+        preemptionManager.refreshQueues(queue.getParent(), queue);
+      }, "REFRESH");
+      schedulerThread.start();
+      completeThread.start();
+      refreshThread.start();
+
+      schedulerThread.join();
+      completeThread.join();
+      refreshThread.join();
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to