This is an automated email from the ASF dual-hosted git repository.

snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e220e88  YARN-10996. Fix race condition of User object acquisitions. 
Contributed by Andras Gyori
e220e88 is described below

commit e220e88eca26311af707f7969da36d402f022a8d
Author: Szilard Nemeth <snem...@apache.org>
AuthorDate: Fri Nov 12 15:33:39 2021 +0100

    YARN-10996. Fix race condition of User object acquisitions. Contributed by 
Andras Gyori
---
 .../capacity/FifoIntraQueuePreemptionPlugin.java    | 10 ++++++----
 .../scheduler/capacity/LeafQueue.java               | 21 ++++++++++++++++++---
 .../scheduler/capacity/UsersManager.java            |  3 ++-
 .../capacity/mockframework/MockApplications.java    |  1 +
 4 files changed, 27 insertions(+), 8 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
index a9d3c05..ea17fed 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
@@ -30,6 +30,7 @@ import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.TreeSet;
 
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -419,8 +420,10 @@ public class FifoIntraQueuePreemptionPlugin
       String userName = app.getUser();
       TempUserPerPartition tmpUser = usersPerPartition.get(userName);
       if (tmpUser == null) {
-        ResourceUsage userResourceUsage = tq.leafQueue.getUser(userName)
-            .getResourceUsage();
+        // User might have already been removed, but preemption still accounts 
for this app,
+        // therefore reinserting the user will not cause a memory leak
+        User  user = tq.leafQueue.getOrCreateUser(userName);
+        ResourceUsage userResourceUsage = user.getResourceUsage();
 
         // perUserAMUsed was populated with running apps, now we are looping
         // through both running and pending apps.
@@ -428,8 +431,7 @@ public class FifoIntraQueuePreemptionPlugin
         amUsed = (userSpecificAmUsed == null)
             ? Resources.none() : userSpecificAmUsed;
 
-        tmpUser = new TempUserPerPartition(
-            tq.leafQueue.getUser(userName), tq.queueName,
+        tmpUser = new TempUserPerPartition(user, tq.queueName,
             Resources.clone(userResourceUsage.getUsed(partition)),
             Resources.clone(amUsed),
             Resources.clone(userResourceUsage.getReserved(partition)),
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index a4e2a82..c7f42d1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -519,6 +519,11 @@ public class LeafQueue extends AbstractCSQueue {
     return usersManager.getUser(userName);
   }
 
+  @VisibleForTesting
+  public User getOrCreateUser(String userName) {
+    return usersManager.getUserAndAddIfAbsent(userName);
+  }
+
   @Private
   public List<AppPriorityACLGroup> getPriorityACLs() {
     readLock.lock();
@@ -2007,7 +2012,12 @@ public class LeafQueue extends AbstractCSQueue {
 
   public void incAMUsedResource(String nodeLabel, Resource resourceToInc,
       SchedulerApplicationAttempt application) {
-    getUser(application.getUser()).getResourceUsage().incAMUsed(nodeLabel,
+    User user = getUser(application.getUser());
+    if (user == null) {
+      return;
+    }
+
+    user.getResourceUsage().incAMUsed(nodeLabel,
         resourceToInc);
     // ResourceUsage has its own lock, no addition lock needs here.
     usageTracker.getQueueUsage().incAMUsed(nodeLabel, resourceToInc);
@@ -2015,7 +2025,12 @@ public class LeafQueue extends AbstractCSQueue {
 
   public void decAMUsedResource(String nodeLabel, Resource resourceToDec,
       SchedulerApplicationAttempt application) {
-    getUser(application.getUser()).getResourceUsage().decAMUsed(nodeLabel,
+    User user = getUser(application.getUser());
+    if (user == null) {
+      return;
+    }
+
+    user.getResourceUsage().decAMUsed(nodeLabel,
         resourceToDec);
     // ResourceUsage has its own lock, no addition lock needs here.
     usageTracker.getQueueUsage().decAMUsed(nodeLabel, resourceToDec);
@@ -2103,7 +2118,7 @@ public class LeafQueue extends AbstractCSQueue {
       for (FiCaSchedulerApp app : getApplications()) {
         String userName = app.getUser();
         if (!userNameToHeadroom.containsKey(userName)) {
-          User user = getUser(userName);
+          User user = getUsersManager().getUserAndAddIfAbsent(userName);
           Resource headroom = Resources.subtract(
               getResourceLimitForActiveUsers(app.getUser(), clusterResources,
                   partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
index 8ba13f0..94df9ab 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
@@ -817,6 +817,7 @@ public class UsersManager implements AbstractUsersManager {
             lQueue.getMinimumAllocation());
 
     if (LOG.isDebugEnabled()) {
+      float weight = lQueue.getUserWeights().getByUser(userName);
       LOG.debug("User limit computation for " + userName
           + ",  in queue: " + lQueue.getQueuePath()
           + ",  userLimitPercent=" + lQueue.getUserLimit()
@@ -834,7 +835,7 @@ public class UsersManager implements AbstractUsersManager {
           + ",  Partition=" + nodePartition
           + ",  resourceUsed=" + resourceUsed
           + ",  maxUserLimit=" + maxUserLimit
-          + ",  userWeight=" + getUser(userName).getWeight()
+          + ",  userWeight=" + weight
       );
     }
     return userLimitResource;
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplications.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplications.java
index b168612..aa00a1a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplications.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplications.java
@@ -165,6 +165,7 @@ class MockApplications {
         user.setResourceUsage(userResourceUsage.get(userName));
       }
       when(queue.getUser(eq(userName))).thenReturn(user);
+      when(queue.getOrCreateUser(eq(userName))).thenReturn(user);
       when(queue.getResourceLimitForAllUsers(eq(userName),
           any(Resource.class), anyString(), any(SchedulingMode.class)))
           .thenReturn(userLimit);

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to