This is an automated email from the ASF dual-hosted git repository.

slfan1989 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 89ebb97b0a22 YARN-11191. Fix potentional deadlock in GlobalScheduler 
refreshQueues. (#6768) Contributed by Tamas Domok.
89ebb97b0a22 is described below

commit 89ebb97b0a224fc967f5fbf3c4d61f8415635008
Author: Tamas Domok <domokta...@gmail.com>
AuthorDate: Sun Apr 28 02:07:24 2024 +0200

    YARN-11191. Fix potentional deadlock in GlobalScheduler refreshQueues. 
(#6768) Contributed by Tamas Domok.
    
    Signed-off-by: Shilun Fan <slfan1...@apache.org>
---
 .../scheduler/capacity/AbstractLeafQueue.java      |  5 ++
 .../scheduler/capacity/AbstractParentQueue.java    | 13 ++++
 .../scheduler/capacity/CSQueue.java                |  6 ++
 .../capacity/preemption/PreemptionManager.java     |  6 +-
 .../scheduler/capacity/TestCapacityScheduler.java  | 76 ++++++++++++++++++++++
 5 files changed, 104 insertions(+), 2 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java
index 280d3d182fb2..4388489a3b94 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java
@@ -369,6 +369,11 @@ public class AbstractLeafQueue extends AbstractCSQueue {
     return null;
   }
 
+  @Override
+  public List<CSQueue> getChildQueuesByTryLock() {
+    return null;
+  }
+
   /**
    * Set user limit.
    * @param userLimit new user limit
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java
index 50516dd2bc5f..4ab8cdf8b36e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.LockSupport;
 
 import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
@@ -1348,6 +1349,18 @@ public abstract class AbstractParentQueue extends 
AbstractCSQueue {
 
   }
 
+  @Override
+  public List<CSQueue> getChildQueuesByTryLock() {
+    try {
+      while (!readLock.tryLock()){
+        LockSupport.parkNanos(10000);
+      }
+      return new ArrayList<>(childQueues);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
   @Override
   public void recoverContainer(Resource clusterResource,
       SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 72f3cd16fe82..a8ee15303f8a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -175,6 +175,12 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
    * @return child queues
    */
   public List<CSQueue> getChildQueues();
+
+  /**
+   * Get child queues By tryLock.
+   * @return child queues
+   */
+  List<CSQueue> getChildQueuesByTryLock();
   
   /**
    * Check if the <code>user</code> has permission to perform the operation
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
index 408198f70461..3aab8e8a5090 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -55,8 +56,9 @@ public class PreemptionManager {
             new PreemptableQueue(parentEntity));
       }
 
-      if (current.getChildQueues() != null) {
-        for (CSQueue child : current.getChildQueues()) {
+      List<CSQueue> childQueues = current.getChildQueuesByTryLock();
+      if (childQueues != null) {
+        for (CSQueue child : childQueues) {
           refreshQueues(current, child);
         }
       }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index ede33b6f3868..f22560a8168c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -83,6 +83,7 @@ import org.apache.hadoop.util.Sets;
 import org.apache.hadoop.service.ServiceStateException;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -3044,4 +3045,79 @@ public class TestCapacityScheduler {
     Assert.assertEquals(0, desQueue.getUsedResources().getMemorySize());
     rm1.close();
   }
+
+  /**
+   * (YARN-11191) This test ensures that no deadlock happens while the
+   * refreshQueues is called on the preemptionManager (refresh thread) and the
+   * AbstractCSQueue.getTotalKillableResource is called from the schedule 
thread.
+   *
+   * @throws Exception TestTimedOutException means deadlock
+   */
+  @Test (timeout = 20000)
+  public void testRefreshQueueWithOpenPreemption() throws Exception {
+    CapacitySchedulerConfiguration csConf = new 
CapacitySchedulerConfiguration();
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{"a"});
+    csConf.setCapacity("root.a", 100);
+    csConf.setQueues("root.a", new String[]{"b"});
+    csConf.setCapacity("root.a.b", 100);
+
+    YarnConfiguration conf = new YarnConfiguration(csConf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+    try (MockRM rm = new MockRM(csConf)) {
+      CapacityScheduler scheduler = (CapacityScheduler) 
rm.getResourceScheduler();
+      PreemptionManager preemptionManager = scheduler.getPreemptionManager();
+      rm.getRMContext().setNodeLabelManager(mgr);
+      rm.start();
+
+      AbstractParentQueue queue = (AbstractParentQueue) 
scheduler.getQueue("a");
+
+      // The scheduler thread holds the queue's read-lock for 5 seconds
+      // then the preemption's read-lock is used
+      Thread schedulerThread = new Thread(() -> {
+        queue.readLock.lock();
+        try {
+          Thread.sleep(5 * 1000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+        preemptionManager.getKillableContainers("a",
+            queue.getDefaultNodeLabelExpression());
+        queue.readLock.unlock();
+      }, "SCHEDULE");
+
+      // The complete thread locks/unlocks the queue's write-lock after 1 
seconds
+      Thread completeThread = new Thread(() -> {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+        queue.writeLock.lock();
+        queue.writeLock.unlock();
+      }, "COMPLETE");
+
+
+      // The refresh thread holds the preemption's write-lock after 2 seconds
+      // while it calls the getChildQueues(ByTryLock) that
+      // locks(tryLocks) the queue's read-lock
+      Thread refreshThread = new Thread(() -> {
+        try {
+          Thread.sleep(2 * 1000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+        preemptionManager.refreshQueues(queue.getParent(), queue);
+      }, "REFRESH");
+      schedulerThread.start();
+      completeThread.start();
+      refreshThread.start();
+
+      schedulerThread.join();
+      completeThread.join();
+      refreshThread.join();
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to