This is an automated email from the ASF dual-hosted git repository. slfan1989 pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 89ebb97b0a22 YARN-11191. Fix potentional deadlock in GlobalScheduler refreshQueues. (#6768) Contributed by Tamas Domok. 89ebb97b0a22 is described below commit 89ebb97b0a224fc967f5fbf3c4d61f8415635008 Author: Tamas Domok <domokta...@gmail.com> AuthorDate: Sun Apr 28 02:07:24 2024 +0200 YARN-11191. Fix potentional deadlock in GlobalScheduler refreshQueues. (#6768) Contributed by Tamas Domok. Signed-off-by: Shilun Fan <slfan1...@apache.org> --- .../scheduler/capacity/AbstractLeafQueue.java | 5 ++ .../scheduler/capacity/AbstractParentQueue.java | 13 ++++ .../scheduler/capacity/CSQueue.java | 6 ++ .../capacity/preemption/PreemptionManager.java | 6 +- .../scheduler/capacity/TestCapacityScheduler.java | 76 ++++++++++++++++++++++ 5 files changed, 104 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java index 280d3d182fb2..4388489a3b94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java @@ -369,6 +369,11 @@ public class AbstractLeafQueue extends AbstractCSQueue { return null; } + @Override + public List<CSQueue> getChildQueuesByTryLock() { + return null; + } + /** * Set user limit. * @param userLimit new user limit diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java index 50516dd2bc5f..4ab8cdf8b36e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; @@ -1348,6 +1349,18 @@ public abstract class AbstractParentQueue extends AbstractCSQueue { } + @Override + public List<CSQueue> getChildQueuesByTryLock() { + try { + while (!readLock.tryLock()){ + LockSupport.parkNanos(10000); + } + return new ArrayList<>(childQueues); + } finally { + readLock.unlock(); + } + } + @Override public void recoverContainer(Resource clusterResource, SchedulerApplicationAttempt attempt, RMContainer rmContainer) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 72f3cd16fe82..a8ee15303f8a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -175,6 +175,12 @@ public interface CSQueue extends SchedulerQueue<CSQueue> { * @return child queues */ public List<CSQueue> getChildQueues(); + + /** + * Get child queues By tryLock. + * @return child queues + */ + List<CSQueue> getChildQueuesByTryLock(); /** * Check if the <code>user</code> has permission to perform the operation diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java index 408198f70461..3aab8e8a5090 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -55,8 +56,9 @@ public class PreemptionManager { new PreemptableQueue(parentEntity)); } - if (current.getChildQueues() != null) { - for (CSQueue child : current.getChildQueues()) { + List<CSQueue> childQueues = current.getChildQueuesByTryLock(); + if (childQueues != null) { + for (CSQueue child : childQueues) { refreshQueues(current, child); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index ede33b6f3868..f22560a8168c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -83,6 +83,7 @@ import org.apache.hadoop.util.Sets; import org.apache.hadoop.service.ServiceStateException; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -3044,4 +3045,79 @@ public class TestCapacityScheduler { Assert.assertEquals(0, desQueue.getUsedResources().getMemorySize()); rm1.close(); } + + /** + * (YARN-11191) This test ensures that no deadlock happens while the + * refreshQueues is called on the preemptionManager (refresh thread) and the + * AbstractCSQueue.getTotalKillableResource is called from the schedule thread. + * + * @throws Exception TestTimedOutException means deadlock + */ + @Test (timeout = 20000) + public void testRefreshQueueWithOpenPreemption() throws Exception { + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{"a"}); + csConf.setCapacity("root.a", 100); + csConf.setQueues("root.a", new String[]{"b"}); + csConf.setCapacity("root.a.b", 100); + + YarnConfiguration conf = new YarnConfiguration(csConf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + try (MockRM rm = new MockRM(csConf)) { + CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); + PreemptionManager preemptionManager = scheduler.getPreemptionManager(); + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + + AbstractParentQueue queue = (AbstractParentQueue) scheduler.getQueue("a"); + + // The scheduler thread holds the queue's read-lock for 5 seconds + // then the preemption's read-lock is used + Thread schedulerThread = new Thread(() -> { + queue.readLock.lock(); + try { + Thread.sleep(5 * 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + preemptionManager.getKillableContainers("a", + queue.getDefaultNodeLabelExpression()); + queue.readLock.unlock(); + }, "SCHEDULE"); + + // The complete thread locks/unlocks the queue's write-lock after 1 seconds + Thread completeThread = new Thread(() -> { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + queue.writeLock.lock(); + queue.writeLock.unlock(); + }, "COMPLETE"); + + + // The refresh thread holds the preemption's write-lock after 2 seconds + // while it calls the getChildQueues(ByTryLock) that + // locks(tryLocks) the queue's read-lock + Thread refreshThread = new Thread(() -> { + try { + Thread.sleep(2 * 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + preemptionManager.refreshQueues(queue.getParent(), queue); + }, "REFRESH"); + schedulerThread.start(); + completeThread.start(); + refreshThread.start(); + + schedulerThread.join(); + completeThread.join(); + refreshThread.join(); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org