YARN-3955. Support for application priority ACLs in queues of 
CapacityScheduler. (Sunil G via wangda)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/287d3d68
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/287d3d68
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/287d3d68

Branch: refs/heads/HADOOP-13345
Commit: 287d3d6804a869723ae36605a3c2d2b3eae3941e
Parents: db490ec
Author: Wangda Tan <wan...@apache.org>
Authored: Mon Jan 9 08:40:39 2017 -0800
Committer: Wangda Tan <wan...@apache.org>
Committed: Mon Jan 9 08:40:39 2017 -0800

----------------------------------------------------------------------
 .../sls/scheduler/ResourceSchedulerWrapper.java |   2 +-
 .../apache/hadoop/yarn/security/AccessType.java |   2 +
 .../conf/capacity-scheduler.xml                 |   9 +
 .../server/resourcemanager/ClientRMService.java |   3 +-
 .../server/resourcemanager/RMAppManager.java    |  16 +-
 .../scheduler/AbstractYarnScheduler.java        |  10 +-
 .../scheduler/YarnScheduler.java                |  11 +-
 .../AppPriorityACLConfigurationParser.java      | 219 ++++++++++++++++++
 .../scheduler/capacity/AppPriorityACLGroup.java | 108 +++++++++
 .../scheduler/capacity/CapacityScheduler.java   | 153 +++++++-----
 .../CapacitySchedulerConfiguration.java         |  44 +++-
 .../capacity/CapacitySchedulerContext.java      |   7 +
 .../capacity/CapacitySchedulerQueueManager.java |  23 +-
 .../scheduler/capacity/LeafQueue.java           |  16 ++
 .../security/AppPriorityACLsManager.java        | 230 +++++++++++++++++++
 .../server/resourcemanager/ACLsTestBase.java    |   5 +-
 .../TestApplicationMasterService.java           |   5 +-
 .../resourcemanager/TestClientRMService.java    |   2 +-
 .../capacity/TestApplicationPriority.java       |  25 +-
 ...TestApplicationPriorityACLConfiguration.java | 120 ++++++++++
 .../capacity/TestApplicationPriorityACLs.java   | 206 +++++++++++++++++
 .../scheduler/capacity/TestParentQueue.java     |   6 +-
 22 files changed, 1127 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
index e66de2f..5517362 100644
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
@@ -963,7 +963,7 @@ final public class ResourceSchedulerWrapper
 
   @Override
   public Priority checkAndGetApplicationPriority(Priority priority,
-      String user, String queueName, ApplicationId applicationId)
+      UserGroupInformation user, String queueName, ApplicationId applicationId)
       throws YarnException {
     // TODO Dummy implementation.
     return Priority.newInstance(0);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AccessType.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AccessType.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AccessType.java
index 32459b9..fb4484b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AccessType.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AccessType.java
@@ -30,4 +30,6 @@ public enum AccessType {
   // queue
   SUBMIT_APP,
   ADMINISTER_QUEUE,
+  // application
+  APPLICATION_MAX_PRIORITY,
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
index 6ac726e..47db01f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
@@ -98,6 +98,15 @@
   </property>
 
   <property>
+    
<name>yarn.scheduler.capacity.root.default.acl_application_max_priority</name>
+    <value>*</value>
+    <description>
+      The ACL of who can submit applications with configured priority.
+      For e.g, [user={name} group={name} max_priority={priority} 
default_priority={priority}]
+    </description>
+  </property>
+
+  <property>
     <name>yarn.scheduler.capacity.node-locality-delay</name>
     <value>40</value>
     <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index cdf30a1..add522b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -1614,7 +1614,8 @@ public class ClientRMService extends AbstractService 
implements
     }
 
     try {
-      rmAppManager.updateApplicationPriority(applicationId, newAppPriority);
+      rmAppManager.updateApplicationPriority(callerUGI, applicationId,
+          newAppPriority);
     } catch (YarnException ex) {
       RMAuditLogger.logFailure(callerUGI.getShortUserName(),
           AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService",

http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 8232b88..47eec54 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -353,19 +353,19 @@ public class RMAppManager implements 
EventHandler<RMAppManagerEvent>,
       RMServerUtils.validateApplicationTimeouts(
           submissionContext.getApplicationTimeouts());
     }
-    
+
     ApplicationId applicationId = submissionContext.getApplicationId();
     ResourceRequest amReq =
         validateAndCreateResourceRequest(submissionContext, isRecovery);
 
     // Verify and get the update application priority and set back to
     // submissionContext
+    UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
     Priority appPriority = scheduler.checkAndGetApplicationPriority(
-        submissionContext.getPriority(), user, submissionContext.getQueue(),
+        submissionContext.getPriority(), userUgi, submissionContext.getQueue(),
         applicationId);
     submissionContext.setPriority(appPriority);
 
-    UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
     // Since FairScheduler queue mapping is done inside scheduler,
     // if FairScheduler is used and the queue doesn't exist, we should not
     // fail here because queue will be created inside FS. Ideally, FS queue
@@ -569,12 +569,14 @@ public class RMAppManager implements 
EventHandler<RMAppManagerEvent>,
   /**
    * updateApplicationPriority will invoke scheduler api to update the
    * new priority to RM and StateStore.
+   * @param callerUGI user
    * @param applicationId Application Id
    * @param newAppPriority proposed new application priority
    * @throws YarnException Handle exceptions
    */
-  public void updateApplicationPriority(ApplicationId applicationId,
-      Priority newAppPriority) throws YarnException {
+  public void updateApplicationPriority(UserGroupInformation callerUGI,
+      ApplicationId applicationId, Priority newAppPriority)
+      throws YarnException {
     RMApp app = this.rmContext.getRMApps().get(applicationId);
 
     synchronized (applicationId) {
@@ -587,8 +589,8 @@ public class RMAppManager implements 
EventHandler<RMAppManagerEvent>,
 
       // Invoke scheduler api to update priority in scheduler and to
       // State Store.
-      Priority appPriority = rmContext.getScheduler()
-          .updateApplicationPriority(newAppPriority, applicationId, future);
+      Priority appPriority = 
rmContext.getScheduler().updateApplicationPriority(
+          newAppPriority, applicationId, future, callerUGI);
 
       if (app.getApplicationPriority().equals(appPriority)) {
         return;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 89f9ffa..9ff0b24 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -35,6 +35,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -788,9 +789,9 @@ public abstract class AbstractYarnScheduler
   }
 
   @Override
-  public Priority checkAndGetApplicationPriority(Priority priorityFromContext,
-      String user, String queueName, ApplicationId applicationId)
-      throws YarnException {
+  public Priority checkAndGetApplicationPriority(
+      Priority priorityRequestedByApp, UserGroupInformation user,
+      String queueName, ApplicationId applicationId) throws YarnException {
     // Dummy Implementation till Application Priority changes are done in
     // specific scheduler.
     return Priority.newInstance(0);
@@ -798,7 +799,8 @@ public abstract class AbstractYarnScheduler
 
   @Override
   public Priority updateApplicationPriority(Priority newPriority,
-      ApplicationId applicationId, SettableFuture<Object> future)
+      ApplicationId applicationId, SettableFuture<Object> future,
+      UserGroupInformation user)
       throws YarnException {
     // Dummy Implementation till Application Priority changes are done in
     // specific scheduler.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
index 5579efd..08e0603 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
@@ -307,7 +307,7 @@ public interface YarnScheduler extends 
EventHandler<SchedulerEvent> {
    * Verify whether a submitted application priority is valid as per configured
    * Queue
    *
-   * @param priorityFromContext
+   * @param priorityRequestedByApp
    *          Submitted Application priority.
    * @param user
    *          User who submitted the Application
@@ -317,8 +317,8 @@ public interface YarnScheduler extends 
EventHandler<SchedulerEvent> {
    *          Application ID
    * @return Updated Priority from scheduler
    */
-  public Priority checkAndGetApplicationPriority(Priority priorityFromContext,
-      String user, String queueName, ApplicationId applicationId)
+  public Priority checkAndGetApplicationPriority(Priority 
priorityRequestedByApp,
+      UserGroupInformation user, String queueName, ApplicationId applicationId)
       throws YarnException;
 
   /**
@@ -330,12 +330,13 @@ public interface YarnScheduler extends 
EventHandler<SchedulerEvent> {
    * @param applicationId Application ID
    *
    * @param future Sets any type of exception happened from StateStore
+   * @param user who submitted the application
    *
    * @return updated priority
    */
   public Priority updateApplicationPriority(Priority newPriority,
-      ApplicationId applicationId, SettableFuture<Object> future)
-      throws YarnException;
+      ApplicationId applicationId, SettableFuture<Object> future,
+      UserGroupInformation user) throws YarnException;
 
   /**
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AppPriorityACLConfigurationParser.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AppPriorityACLConfigurationParser.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AppPriorityACLConfigurationParser.java
new file mode 100644
index 0000000..4489a01
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AppPriorityACLConfigurationParser.java
@@ -0,0 +1,219 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.Priority;
+
+/**
+ *
+ * PriorityACLConfiguration class is used to parse Application Priority ACL
+ * configuration from capcity-scheduler.xml
+ */
+public class AppPriorityACLConfigurationParser {
+
+  private static final Log LOG = LogFactory
+      .getLog(AppPriorityACLConfigurationParser.class);
+
+  public enum AppPriorityACLKeyType {
+    USER(1), GROUP(2), MAX_PRIORITY(3), DEFAULT_PRIORITY(4);
+
+    private final int id;
+
+    AppPriorityACLKeyType(int id) {
+      this.id = id;
+    }
+
+    public int getId() {
+      return this.id;
+    }
+  }
+
+  public static final String PATTERN_FOR_PRIORITY_ACL = "\\[([^\\]]+)";
+
+  @Private
+  public static final String ALL_ACL = "*";
+
+  @Private
+  public static final String NONE_ACL = " ";
+
+  public List<AppPriorityACLGroup> getPriorityAcl(Priority clusterMaxPriority,
+      String aclString) {
+
+    List<AppPriorityACLGroup> aclList = new ArrayList<AppPriorityACLGroup>();
+    Matcher matcher = Pattern.compile(PATTERN_FOR_PRIORITY_ACL)
+        .matcher(aclString);
+
+    /*
+     * Each ACL group will be separated by "[]". Syntax of each ACL group could
+     * be like below "user=b1,b2 group=g1 max-priority=a2 default-priority=a1"
+     * Ideally this means "for this given user/group, maximum possible priority
+     * is a2 and if the user has not specified any priority, then it is a1."
+     */
+    while (matcher.find()) {
+      // Get the first ACL sub-group.
+      String aclSubGroup = matcher.group(1);
+      if (aclSubGroup.trim().isEmpty()) {
+        continue;
+      }
+
+      /*
+       * Internal storage is PriorityACLGroup which stores each parsed priority
+       * ACLs group. This will help while looking for a user to priority 
mapping
+       * during app submission time. ACLs will be passed in below order only. 
1.
+       * user/group 2. max-priority 3. default-priority
+       */
+      AppPriorityACLGroup userPriorityACL = new AppPriorityACLGroup();
+
+      // userAndGroupName will hold user acl and group acl as interim storage
+      // since both user/group acl comes with separate key value pairs.
+      List<StringBuilder> userAndGroupName = new ArrayList<>();
+
+      for (String kvPair : aclSubGroup.trim().split(" +")) {
+        /*
+         * There are 3 possible options for key here: 1. user/group 2.
+         * max-priority 3. default-priority
+         */
+        String[] splits = kvPair.split("=");
+
+        // Ensure that each ACL sub string is key value pair separated by '='.
+        if (splits != null && splits.length > 1) {
+          parsePriorityACLType(userPriorityACL, splits, userAndGroupName);
+        }
+      }
+
+      // If max_priority is higher to clusterMaxPriority, its better to
+      // handle here.
+      if (userPriorityACL.getMaxPriority().getPriority() > clusterMaxPriority
+          .getPriority()) {
+        LOG.warn("ACL configuration for '" + userPriorityACL.getMaxPriority()
+            + "' is greater that cluster max priority. Resetting ACLs to "
+            + clusterMaxPriority);
+        userPriorityACL.setMaxPriority(
+            Priority.newInstance(clusterMaxPriority.getPriority()));
+      }
+
+      AccessControlList acl = createACLStringForPriority(userAndGroupName);
+      userPriorityACL.setACLList(acl);
+      aclList.add(userPriorityACL);
+    }
+
+    return aclList;
+  }
+
+  /*
+   * Parse different types of ACLs sub parts for on priority group and store in
+   * a map for later processing.
+   */
+  private void parsePriorityACLType(AppPriorityACLGroup userPriorityACL,
+      String[] splits, List<StringBuilder> userAndGroupName) {
+    // Here splits will have the key value pair at index 0 and 1 respectively.
+    // To parse all keys, its better to convert to PriorityACLConfig enum.
+    AppPriorityACLKeyType aclType = AppPriorityACLKeyType
+        .valueOf(StringUtils.toUpperCase(splits[0].trim()));
+    switch (aclType) {
+    case MAX_PRIORITY :
+      userPriorityACL
+          .setMaxPriority(Priority.newInstance(Integer.parseInt(splits[1])));
+      break;
+    case USER :
+      userAndGroupName.add(getUserOrGroupACLStringFromConfig(splits[1]));
+      break;
+    case GROUP :
+      userAndGroupName.add(getUserOrGroupACLStringFromConfig(splits[1]));
+      break;
+    case DEFAULT_PRIORITY :
+      int defaultPriority = Integer.parseInt(splits[1]);
+      Priority priority = (defaultPriority < 0)
+          ? Priority.newInstance(0)
+          : Priority.newInstance(defaultPriority);
+      userPriorityACL.setDefaultPriority(priority);
+      break;
+    default:
+      break;
+    }
+  }
+
+  /*
+   * This method will help to append different types of ACLs keys against one
+   * priority. For eg,USER will be appended with GROUP as "user2,user4 group1".
+   */
+  private AccessControlList createACLStringForPriority(
+      List<StringBuilder> acls) {
+
+    String finalACL = "";
+    String userACL = acls.get(0).toString();
+
+    // If any of user/group is *, consider it as acceptable for all.
+    // "user" is at index 0, and "group" is at index 1.
+    if (userACL.trim().equals(ALL_ACL)) {
+      finalACL = ALL_ACL;
+    } else if (userACL.equals(NONE_ACL)) {
+      finalACL = NONE_ACL;
+    } else {
+
+      // Get USER segment
+      if (!userACL.trim().isEmpty()) {
+        // skip last appended ","
+        finalACL = acls.get(0).toString();
+      }
+
+      // Get GROUP segment if any
+      if (acls.size() > 1) {
+        String groupACL = acls.get(1).toString();
+        if (!groupACL.trim().isEmpty()) {
+          finalACL = finalACL + " "
+              + acls.get(1).toString();
+        }
+      }
+    }
+
+    // Here ACL will look like "user1,user2 group" in ideal cases.
+    return new AccessControlList(finalACL.trim());
+  }
+
+  /*
+   * This method will help to append user/group acl string against given
+   * priority. For example "user1,user2 group1,group2"
+   */
+  private StringBuilder getUserOrGroupACLStringFromConfig(String value) {
+
+    // ACL strings could be generate for USER or GRUOP.
+    // aclList in map contains two entries. 1. USER, 2. GROUP.
+    StringBuilder aclTypeName = new StringBuilder();
+
+    if (value.trim().equals(ALL_ACL)) {
+      aclTypeName.setLength(0);
+      aclTypeName.append(ALL_ACL);
+      return aclTypeName;
+    }
+
+    aclTypeName.append(value.trim());
+    return aclTypeName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AppPriorityACLGroup.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AppPriorityACLGroup.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AppPriorityACLGroup.java
new file mode 100644
index 0000000..cb5ebcb
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AppPriorityACLGroup.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.api.records.Priority;
+
+/**
+ * PriorityACLGroup will hold all ACL related information per priority.
+ *
+ */
+public class AppPriorityACLGroup implements Comparable<AppPriorityACLGroup> {
+
+  private Priority maxPriority = null;
+  private Priority defaultPriority = null;
+  private AccessControlList aclList = null;
+
+  public AppPriorityACLGroup(Priority maxPriority, Priority defaultPriority,
+      AccessControlList aclList) {
+    this.setMaxPriority(Priority.newInstance(maxPriority.getPriority()));
+    this.setDefaultPriority(
+        Priority.newInstance(defaultPriority.getPriority()));
+    this.setACLList(aclList);
+  }
+
+  public AppPriorityACLGroup() {
+  }
+
+  @Override
+  public int compareTo(AppPriorityACLGroup o) {
+    return getMaxPriority().compareTo(o.getMaxPriority());
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+
+    if (obj == null) {
+      return false;
+    }
+
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+
+    AppPriorityACLGroup other = (AppPriorityACLGroup) obj;
+    if (getMaxPriority() != other.getMaxPriority()) {
+      return false;
+    }
+
+    if (getDefaultPriority() != other.getDefaultPriority()) {
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 517861;
+    int result = 9511;
+    result = prime * result + getMaxPriority().getPriority();
+    result = prime * result + getDefaultPriority().getPriority();
+    return result;
+  }
+
+  public Priority getMaxPriority() {
+    return maxPriority;
+  }
+
+  public Priority getDefaultPriority() {
+    return defaultPriority;
+  }
+
+  public AccessControlList getACLList() {
+    return aclList;
+  }
+
+  public void setMaxPriority(Priority maxPriority) {
+    this.maxPriority = maxPriority;
+  }
+
+  public void setDefaultPriority(Priority defaultPriority) {
+    this.defaultPriority = defaultPriority;
+  }
+
+  public void setACLList(AccessControlList accessControlList) {
+    this.aclList = accessControlList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 55ffe25..c475967 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -137,6 +137,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
+import 
org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -227,6 +228,7 @@ public class CapacityScheduler extends
   private List<AsyncScheduleThread> asyncSchedulerThreads;
   private ResourceCommitterService resourceCommitterService;
   private RMNodeLabelsManager labelManager;
+  private AppPriorityACLsManager appPriorityACLManager;
 
   /**
    * EXPERT
@@ -308,8 +310,9 @@ public class CapacityScheduler extends
       this.usePortForNodeName = this.conf.getUsePortForNodeName();
       this.applications = new ConcurrentHashMap<>();
       this.labelManager = rmContext.getNodeLabelManager();
+      this.appPriorityACLManager = new AppPriorityACLsManager(conf);
       this.queueManager = new CapacitySchedulerQueueManager(yarnConf,
-          this.labelManager);
+          this.labelManager, this.appPriorityACLManager);
       this.queueManager.setCapacitySchedulerContext(this);
 
       this.activitiesManager = new ActivitiesManager(rmContext);
@@ -2188,86 +2191,110 @@ public class CapacityScheduler extends
   }
 
   @Override
-  public Priority checkAndGetApplicationPriority(Priority priorityFromContext,
-      String user, String queueName, ApplicationId applicationId)
-      throws YarnException {
-    Priority appPriority = null;
-
-    // ToDo: Verify against priority ACLs
-
-    // Verify the scenario where priority is null from submissionContext.
-    if (null == priorityFromContext) {
-      // Get the default priority for the Queue. If Queue is non-existent, then
-      // use default priority
-      priorityFromContext = this.queueManager.getDefaultPriorityForQueue(
-          queueName);
+  public Priority checkAndGetApplicationPriority(
+      Priority priorityRequestedByApp, UserGroupInformation user,
+      String queueName, ApplicationId applicationId) throws YarnException {
+    try {
+      readLock.lock();
+      Priority appPriority = priorityRequestedByApp;
+
+      // Verify the scenario where priority is null from submissionContext.
+      if (null == appPriority) {
+        // Verify whether submitted user has any default priority set. If so,
+        // user's default priority will get precedence over queue default.
+        // for updateApplicationPriority call flow, this check is done in
+        // CientRMService itself.
+        appPriority = this.appPriorityACLManager.getDefaultPriority(queueName,
+            user);
+
+        // Get the default priority for the Queue. If Queue is non-existent,
+        // then
+        // use default priority. Do it only if user doesnt have any default.
+        if (null == appPriority) {
+          appPriority = 
this.queueManager.getDefaultPriorityForQueue(queueName);
+        }
 
-      LOG.info("Application '" + applicationId
-          + "' is submitted without priority "
-          + "hence considering default queue/cluster priority: "
-          + priorityFromContext.getPriority());
-    }
+        LOG.info(
+            "Application '" + applicationId + "' is submitted without priority 
"
+                + "hence considering default queue/cluster priority: "
+                + appPriority.getPriority());
+      }
 
-    // Verify whether submitted priority is lesser than max priority
-    // in the cluster. If it is out of found, defining a max cap.
-    if (priorityFromContext.compareTo(getMaxClusterLevelAppPriority()) < 0) {
-      priorityFromContext = Priority
-          .newInstance(getMaxClusterLevelAppPriority().getPriority());
-    }
+      // Verify whether submitted priority is lesser than max priority
+      // in the cluster. If it is out of found, defining a max cap.
+      if (appPriority.getPriority() > getMaxClusterLevelAppPriority()
+          .getPriority()) {
+        appPriority = Priority
+            .newInstance(getMaxClusterLevelAppPriority().getPriority());
+      }
 
-    appPriority = priorityFromContext;
+      // Lets check for ACLs here.
+      if (!appPriorityACLManager.checkAccess(user, queueName, appPriority)) {
+        throw new YarnException(new AccessControlException(
+            "User " + user + " does not have permission to submit/update "
+                + applicationId + " for " + appPriority));
+      }
 
-    LOG.info("Priority '" + appPriority.getPriority()
-        + "' is acceptable in queue : " + queueName + " for application: "
-        + applicationId + " for the user: " + user);
+      LOG.info("Priority '" + appPriority.getPriority()
+          + "' is acceptable in queue : " + queueName + " for application: "
+          + applicationId);
 
-    return appPriority;
+      return appPriority;
+    } finally {
+      readLock.unlock();
+    }
   }
 
   @Override
   public Priority updateApplicationPriority(Priority newPriority,
-      ApplicationId applicationId, SettableFuture<Object> future)
+      ApplicationId applicationId, SettableFuture<Object> future,
+      UserGroupInformation user)
       throws YarnException {
-    Priority appPriority = null;
-    SchedulerApplication<FiCaSchedulerApp> application = applications
-        .get(applicationId);
+    try {
+      writeLock.lock();
+      Priority appPriority = null;
+      SchedulerApplication<FiCaSchedulerApp> application = applications
+          .get(applicationId);
 
-    if (application == null) {
-      throw new YarnException("Application '" + applicationId
-          + "' is not present, hence could not change priority.");
-    }
+      if (application == null) {
+        throw new YarnException("Application '" + applicationId
+            + "' is not present, hence could not change priority.");
+      }
 
-    RMApp rmApp = rmContext.getRMApps().get(applicationId);
+      RMApp rmApp = rmContext.getRMApps().get(applicationId);
 
-    appPriority = checkAndGetApplicationPriority(newPriority, rmApp.getUser(),
-        rmApp.getQueue(), applicationId);
+      appPriority = checkAndGetApplicationPriority(newPriority, user,
+          rmApp.getQueue(), applicationId);
 
-    if (application.getPriority().equals(appPriority)) {
-      future.set(null);
-      return appPriority;
-    }
+      if (application.getPriority().equals(appPriority)) {
+        future.set(null);
+        return appPriority;
+      }
 
-    // Update new priority in Submission Context to update to StateStore.
-    rmApp.getApplicationSubmissionContext().setPriority(appPriority);
+      // Update new priority in Submission Context to update to StateStore.
+      rmApp.getApplicationSubmissionContext().setPriority(appPriority);
 
-    // Update to state store
-    ApplicationStateData appState = ApplicationStateData.newInstance(
-        rmApp.getSubmitTime(), rmApp.getStartTime(),
-        rmApp.getApplicationSubmissionContext(), rmApp.getUser(),
-        rmApp.getCallerContext());
-    appState.setApplicationTimeouts(rmApp.getApplicationTimeouts());
-    rmContext.getStateStore().updateApplicationStateSynchronously(appState,
-        false, future);
+      // Update to state store
+      ApplicationStateData appState = ApplicationStateData.newInstance(
+          rmApp.getSubmitTime(), rmApp.getStartTime(),
+          rmApp.getApplicationSubmissionContext(), rmApp.getUser(),
+          rmApp.getCallerContext());
+      appState.setApplicationTimeouts(rmApp.getApplicationTimeouts());
+      rmContext.getStateStore().updateApplicationStateSynchronously(appState,
+          false, future);
 
-    // As we use iterator over a TreeSet for OrderingPolicy, once we change
-    // priority then reinsert back to make order correct.
-    LeafQueue queue = (LeafQueue) getQueue(rmApp.getQueue());
-    queue.updateApplicationPriority(application, appPriority);
+      // As we use iterator over a TreeSet for OrderingPolicy, once we change
+      // priority then reinsert back to make order correct.
+      LeafQueue queue = (LeafQueue) getQueue(rmApp.getQueue());
+      queue.updateApplicationPriority(application, appPriority);
 
-    LOG.info("Priority '" + appPriority + "' is updated in queue :"
-        + rmApp.getQueue() + " for application: " + applicationId
-        + " for the user: " + rmApp.getUser());
-    return appPriority;
+      LOG.info("Priority '" + appPriority + "' is updated in queue :"
+          + rmApp.getQueue() + " for application: " + applicationId
+          + " for the user: " + rmApp.getUser());
+      return appPriority;
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index bfaeba4..eb148d2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.ReservationACL;
@@ -49,6 +50,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
 import 
org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AppPriorityACLConfigurationParser.AppPriorityACLKeyType;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
@@ -63,7 +65,7 @@ public class CapacitySchedulerConfiguration extends 
ReservationSchedulerConfigur
 
   private static final Log LOG = 
     LogFactory.getLog(CapacitySchedulerConfiguration.class);
-  
+
   private static final String CS_CONFIGURATION_FILE = "capacity-scheduler.xml";
   
   @Private
@@ -274,6 +276,8 @@ public class CapacitySchedulerConfiguration extends 
ReservationSchedulerConfigur
   @Private
   public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false;
 
+  AppPriorityACLConfigurationParser priorityACLConfig = new 
AppPriorityACLConfigurationParser();
+
   public CapacitySchedulerConfiguration() {
     this(new Configuration());
   }
@@ -602,6 +606,10 @@ public class CapacitySchedulerConfiguration extends 
ReservationSchedulerConfigur
     return "acl_" + StringUtils.toLowerCase(acl.toString());
   }
 
+  private static String getAclKey(AccessType acl) {
+    return "acl_" + StringUtils.toLowerCase(acl.toString());
+  }
+
   @Override
   public Map<ReservationACL, AccessControlList> getReservationAcls(String
         queue) {
@@ -627,6 +635,11 @@ public class CapacitySchedulerConfiguration extends 
ReservationSchedulerConfigur
     set(queuePrefix + getAclKey(acl), aclString);
   }
 
+  private void setAcl(String queue, AccessType acl, String aclString) {
+    String queuePrefix = getQueuePrefix(queue);
+    set(queuePrefix + getAclKey(acl), aclString);
+  }
+
   public Map<AccessType, AccessControlList> getAcls(String queue) {
     Map<AccessType, AccessControlList> acls =
       new HashMap<AccessType, AccessControlList>();
@@ -650,6 +663,35 @@ public class CapacitySchedulerConfiguration extends 
ReservationSchedulerConfigur
     }
   }
 
+  @VisibleForTesting
+  public void setPriorityAcls(String queue, Priority priority,
+      Priority defaultPriority, String[] acls) {
+    StringBuilder aclString = new StringBuilder();
+
+    StringBuilder userAndGroup = new StringBuilder();
+    for (int i = 0; i < acls.length; i++) {
+      userAndGroup.append(AppPriorityACLKeyType.values()[i] + "=" + 
acls[i].trim())
+          .append(" ");
+    }
+
+    aclString.append("[" + userAndGroup.toString().trim() + " "
+        + "max_priority=" + priority.getPriority() + " " + "default_priority="
+        + defaultPriority.getPriority() + "]");
+
+    setAcl(queue, AccessType.APPLICATION_MAX_PRIORITY, aclString.toString());
+  }
+
+  public List<AppPriorityACLGroup> getPriorityAcls(String queue,
+      Priority clusterMaxPriority) {
+    String queuePrefix = getQueuePrefix(queue);
+    String defaultAcl = ALL_ACL;
+    String aclString = get(
+        queuePrefix + getAclKey(AccessType.APPLICATION_MAX_PRIORITY),
+        defaultAcl);
+
+    return priorityACLConfig.getPriorityAcl(clusterMaxPriority, aclString);
+  }
+
   public String[] getQueues(String queue) {
     LOG.debug("CSConf - getQueues called for: queuePrefix=" + 
getQueuePrefix(queue));
     String[] queues = getStrings(getQueuePrefix(queue) + QUEUES);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
index 7d29619..504acb9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
@@ -23,6 +23,7 @@ import java.util.Comparator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
@@ -85,4 +86,10 @@ public interface CapacitySchedulerContext {
   ActivitiesManager getActivitiesManager();
 
   CapacitySchedulerQueueManager getCapacitySchedulerQueueManager();
+
+  /**
+   *
+   * @return Max Cluster level App priority.
+   */
+  Priority getMaxClusterLevelAppPriority();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
index 6a3c08a..ddcbc0e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
@@ -43,6 +43,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
 
 /**
  *
@@ -86,6 +87,7 @@ public class CapacitySchedulerQueueManager implements 
SchedulerQueueManager<
   private final Map<String, CSQueue> queues = new ConcurrentHashMap<>();
   private CSQueue root;
   private final RMNodeLabelsManager labelManager;
+  private AppPriorityACLsManager appPriorityACLManager;
 
   private QueueStateManager<CSQueue, CapacitySchedulerConfiguration>
       queueStateManager;
@@ -94,12 +96,15 @@ public class CapacitySchedulerQueueManager implements 
SchedulerQueueManager<
    * Construct the service.
    * @param conf the configuration
    * @param labelManager the labelManager
+   * @param appPriorityACLManager App priority ACL manager
    */
   public CapacitySchedulerQueueManager(Configuration conf,
-      RMNodeLabelsManager labelManager) {
+      RMNodeLabelsManager labelManager,
+      AppPriorityACLsManager appPriorityACLManager) {
     this.authorizer = YarnAuthorizationProvider.getInstance(conf);
     this.labelManager = labelManager;
     this.queueStateManager = new QueueStateManager<>();
+    this.appPriorityACLManager = appPriorityACLManager;
   }
 
   @Override
@@ -145,7 +150,7 @@ public class CapacitySchedulerQueueManager implements 
SchedulerQueueManager<
       throws IOException {
     root = parseQueue(this.csContext, conf, null,
         CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP);
-    setQueueAcls(authorizer, queues);
+    setQueueAcls(authorizer, appPriorityACLManager, queues);
     labelManager.reinitializeQueueLabels(getQueueToLabels());
     this.queueStateManager.initialize(this);
     LOG.info("Initialized root queue " + root);
@@ -168,7 +173,7 @@ public class CapacitySchedulerQueueManager implements 
SchedulerQueueManager<
     // Re-configure queues
     root.reinitialize(newRoot, this.csContext.getClusterResource());
 
-    setQueueAcls(authorizer, queues);
+    setQueueAcls(authorizer, appPriorityACLManager, queues);
 
     // Re-calculate headroom for active applications
     Resource clusterResource = this.csContext.getClusterResource();
@@ -305,12 +310,22 @@ public class CapacitySchedulerQueueManager implements 
SchedulerQueueManager<
    * @throws IOException if fails to set queue acls
    */
   public static void setQueueAcls(YarnAuthorizationProvider authorizer,
-      Map<String, CSQueue> queues) throws IOException {
+      AppPriorityACLsManager appPriorityACLManager, Map<String, CSQueue> 
queues)
+      throws IOException {
     List<Permission> permissions = new ArrayList<>();
     for (CSQueue queue : queues.values()) {
       AbstractCSQueue csQueue = (AbstractCSQueue) queue;
       permissions.add(
           new Permission(csQueue.getPrivilegedEntity(), csQueue.getACLs()));
+
+      if (queue instanceof LeafQueue) {
+        LeafQueue lQueue = (LeafQueue) queue;
+
+        // Clear Priority ACLs first since reinitialize also call same.
+        appPriorityACLManager.clearPriorityACLs(lQueue.getQueueName());
+        appPriorityACLManager.addPrioirityACLs(lQueue.getPriorityACLs(),
+            lQueue.getQueueName());
+      }
     }
     authorizer.setPermission(permissions,
         UserGroupInformation.getCurrentUser());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 18b38f4..6d4dbe0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -140,6 +140,9 @@ public class LeafQueue extends AbstractCSQueue {
   private Map<String, TreeSet<RMContainer>> 
ignorePartitionExclusivityRMContainers =
       new ConcurrentHashMap<>();
 
+  List<AppPriorityACLGroup> priorityAcls =
+      new ArrayList<AppPriorityACLGroup>();
+
   @SuppressWarnings({ "unchecked", "rawtypes" })
   public LeafQueue(CapacitySchedulerContext cs,
       String queueName, CSQueue parent, CSQueue old) throws IOException {
@@ -205,6 +208,9 @@ public class LeafQueue extends AbstractCSQueue {
           conf.getMaximumApplicationMasterResourcePerQueuePercent(
               getQueuePath());
 
+      priorityAcls = conf.getPriorityAcls(getQueuePath(),
+          scheduler.getMaxClusterLevelAppPriority());
+
       if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels,
           this.defaultLabelExpression, null)) {
         throw new IOException(
@@ -497,6 +503,16 @@ public class LeafQueue extends AbstractCSQueue {
     }
   }
 
+  @Private
+  public List<AppPriorityACLGroup> getPriorityACLs() {
+    try {
+      readLock.lock();
+      return new ArrayList<>(priorityAcls);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
   @Override
   public void reinitialize(
       CSQueue newlyParsedQueue, Resource clusterResource) 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AppPriorityACLsManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AppPriorityACLsManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AppPriorityACLsManager.java
new file mode 100644
index 0000000..c1fd0a6
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AppPriorityACLsManager.java
@@ -0,0 +1,230 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.security;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AppPriorityACLGroup;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ *
+ * Manager class to store and check permission for Priority ACLs.
+ */
+public class AppPriorityACLsManager {
+
+  private static final Log LOG = LogFactory
+      .getLog(AppPriorityACLsManager.class);
+
+  /*
+   * An internal class to store ACLs specific to each priority. This will be
+   * used to read and process acl's during app submission time as well.
+   */
+  private static class PriorityACL {
+    private Priority priority;
+    private Priority defaultPriority;
+    private AccessControlList acl;
+
+    PriorityACL(Priority priority, Priority defaultPriority,
+        AccessControlList acl) {
+      this.setPriority(priority);
+      this.setDefaultPriority(defaultPriority);
+      this.setAcl(acl);
+    }
+
+    public Priority getPriority() {
+      return priority;
+    }
+
+    public void setPriority(Priority maxPriority) {
+      this.priority = maxPriority;
+    }
+
+    public Priority getDefaultPriority() {
+      return defaultPriority;
+    }
+
+    public void setDefaultPriority(Priority defaultPriority) {
+      this.defaultPriority = defaultPriority;
+    }
+
+    public AccessControlList getAcl() {
+      return acl;
+    }
+
+    public void setAcl(AccessControlList acl) {
+      this.acl = acl;
+    }
+  }
+
+  private boolean isACLsEnable;
+  private final ConcurrentMap<String, List<PriorityACL>> allAcls =
+      new ConcurrentHashMap<>();
+
+  public AppPriorityACLsManager(Configuration conf) {
+    this.isACLsEnable = conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE,
+        YarnConfiguration.DEFAULT_YARN_ACL_ENABLE);
+  }
+
+  /**
+   * Clear priority acl during refresh.
+   *
+   * @param queueName
+   *          Queue Name
+   */
+  public void clearPriorityACLs(String queueName) {
+    allAcls.remove(queueName);
+  }
+
+  /**
+   * Each Queue could have configured with different priority acl's groups. 
This
+   * method helps to store each such ACL list against queue.
+   *
+   * @param priorityACLGroups
+   *          List of Priority ACL Groups.
+   * @param queueName
+   *          Queue Name associate with priority acl groups.
+   */
+  public void addPrioirityACLs(List<AppPriorityACLGroup> priorityACLGroups,
+      String queueName) {
+
+    List<PriorityACL> priorityACL = allAcls.get(queueName);
+    if (null == priorityACL) {
+      priorityACL = new ArrayList<PriorityACL>();
+      allAcls.put(queueName, priorityACL);
+    }
+
+    // Ensure lowest priority PriorityACLGroup comes first in the list.
+    Collections.sort(priorityACLGroups);
+
+    for (AppPriorityACLGroup priorityACLGroup : priorityACLGroups) {
+      priorityACL.add(new PriorityACL(priorityACLGroup.getMaxPriority(),
+          priorityACLGroup.getDefaultPriority(),
+          priorityACLGroup.getACLList()));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Priority ACL group added: max-priority - "
+            + priorityACLGroup.getMaxPriority() + "default-priority - "
+            + priorityACLGroup.getDefaultPriority());
+      }
+    }
+  }
+
+  /**
+   * Priority based checkAccess to ensure that given user has enough permission
+   * to submit application at a given priority level.
+   *
+   * @param callerUGI
+   *          User who submits the application.
+   * @param queueName
+   *          Queue to which application is submitted.
+   * @param submittedPriority
+   *          priority of the application.
+   * @return True or False to indicate whether application can be submitted at
+   *         submitted priority level or not.
+   */
+  public boolean checkAccess(UserGroupInformation callerUGI, String queueName,
+      Priority submittedPriority) {
+    if (!isACLsEnable) {
+      return true;
+    }
+
+    List<PriorityACL> acls = allAcls.get(queueName);
+    if (acls == null || acls.isEmpty()) {
+      return true;
+    }
+
+    PriorityACL approvedPriorityACL = getMappedPriorityAclForUGI(acls,
+        callerUGI, submittedPriority);
+    if (approvedPriorityACL == null) {
+      return false;
+    }
+
+    return true;
+  }
+
+  /**
+   * If an application is submitted without any priority, and submitted user 
has
+   * a default priority, this method helps to update this default priority as
+   * app's priority.
+   *
+   * @param queueName
+   *          Submitted queue
+   * @param user
+   *          User who submitted this application
+   * @return Default priority associated with given user.
+   */
+  public Priority getDefaultPriority(String queueName,
+      UserGroupInformation user) {
+    if (!isACLsEnable) {
+      return null;
+    }
+
+    List<PriorityACL> acls = allAcls.get(queueName);
+    if (acls == null || acls.isEmpty()) {
+      return null;
+    }
+
+    PriorityACL approvedPriorityACL = getMappedPriorityAclForUGI(acls, user,
+        null);
+    if (approvedPriorityACL == null) {
+      return null;
+    }
+
+    Priority defaultPriority = Priority
+        .newInstance(approvedPriorityACL.getDefaultPriority().getPriority());
+    return defaultPriority;
+  }
+
+  private PriorityACL getMappedPriorityAclForUGI(List<PriorityACL> acls ,
+      UserGroupInformation user, Priority submittedPriority) {
+
+    // Iterate through all configured ACLs starting from lower priority.
+    // If user is found corresponding to a configured priority, then store
+    // that entry. if failed, continue iterate through whole acl list.
+    PriorityACL selectedAcl = null;
+    for (PriorityACL entry : acls) {
+      AccessControlList list = entry.getAcl();
+
+      if (list.isUserAllowed(user)) {
+        selectedAcl = entry;
+
+        // If submittedPriority is passed through the argument, also check
+        // whether submittedPriority is under max-priority of each ACL group.
+        if (submittedPriority != null) {
+          selectedAcl = null;
+          if (submittedPriority.getPriority() <= entry.getPriority()
+              .getPriority()) {
+            return entry;
+          }
+        }
+      }
+    }
+    return selectedAcl;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java
index e661703..fbd5ac3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java
@@ -43,6 +43,8 @@ public abstract class ACLsTestBase {
   protected static final String COMMON_USER = "common_user";
   protected static final String QUEUE_A_USER = "queueA_user";
   protected static final String QUEUE_B_USER = "queueB_user";
+  protected static final String QUEUE_A_GROUP = "queueA_group";
+  protected static final String QUEUE_B_GROUP = "queueB_group";
   protected static final String ROOT_ADMIN = "root_admin";
   protected static final String QUEUE_A_ADMIN = "queueA_admin";
   protected static final String QUEUE_B_ADMIN = "queueB_admin";
@@ -53,7 +55,7 @@ public abstract class ACLsTestBase {
 
   protected static final Log LOG = 
LogFactory.getLog(TestApplicationACLs.class);
 
-  MockRM resourceManager;
+  protected MockRM resourceManager;
   Configuration conf;
   YarnRPC rpc;
   InetSocketAddress rmAddress;
@@ -68,6 +70,7 @@ public abstract class ACLsTestBase {
 
     AccessControlList adminACL = new AccessControlList("");
     conf.set(YarnConfiguration.YARN_ADMIN_ACL, adminACL.getAclString());
+    conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
 
     resourceManager = new MockRM(conf) {
       protected ClientRMService createClientRMService() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
index 00466ae..23bed22 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
@@ -29,6 +29,7 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@@ -463,7 +464,9 @@ public class TestApplicationMasterService {
 
     // Change the priority of App1 to 8
     Priority appPriority2 = Priority.newInstance(8);
-    rm.getRMAppManager().updateApplicationPriority(app1.getApplicationId(),
+    UserGroupInformation ugi = UserGroupInformation
+        .createRemoteUser(app1.getUser());
+    rm.getRMAppManager().updateApplicationPriority(ugi, 
app1.getApplicationId(),
         appPriority2);
 
     AllocateResponse response2 = am1.allocate(allocateRequest);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index c135384..5ecba12 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -1123,7 +1123,7 @@ public class TestClientRMService {
     when(yarnScheduler.getResourceCalculator()).thenReturn(rs);
 
     when(yarnScheduler.checkAndGetApplicationPriority(any(Priority.class),
-        anyString(), anyString(), any(ApplicationId.class)))
+        any(UserGroupInformation.class), anyString(), 
any(ApplicationId.class)))
             .thenReturn(Priority.newInstance(0));
     return yarnScheduler;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
index 164ca20..ff52efd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -344,7 +345,10 @@ public class TestApplicationPriority {
 
     // Change the priority of App1 to 8
     Priority appPriority2 = Priority.newInstance(8);
-    cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null);
+    UserGroupInformation ugi = UserGroupInformation
+        .createRemoteUser(app1.getUser());
+    cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null,
+        ugi);
 
     // get scheduler app
     FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications()
@@ -378,7 +382,10 @@ public class TestApplicationPriority {
 
     // Change the priority of App1 to 15
     Priority appPriority2 = Priority.newInstance(15);
-    cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null);
+    UserGroupInformation ugi = UserGroupInformation
+        .createRemoteUser(app1.getUser());
+    cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null,
+        ugi);
 
     // get scheduler app
     FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications()
@@ -428,7 +435,10 @@ public class TestApplicationPriority {
 
     // Change the priority of App1 to 8
     Priority appPriority2 = Priority.newInstance(8);
-    cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null);
+    UserGroupInformation ugi = UserGroupInformation
+        .createRemoteUser(app1.getUser());
+    cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null,
+        ugi);
 
     // let things settle down
     Thread.sleep(1000);
@@ -557,7 +567,10 @@ public class TestApplicationPriority {
 
     // Change the priority of App1 to 3 (lowest)
     Priority appPriority3 = Priority.newInstance(3);
-    cs.updateApplicationPriority(appPriority3, app2.getApplicationId(), null);
+    UserGroupInformation ugi = UserGroupInformation
+        .createRemoteUser(app2.getUser());
+    cs.updateApplicationPriority(appPriority3, app2.getApplicationId(), null,
+        ugi);
 
     // add request for containers App2
     am2.allocate("127.0.0.1", 2 * GB, 3, new ArrayList<ContainerId>());
@@ -788,8 +801,10 @@ public class TestApplicationPriority {
       int appsPendingExpected, int activeAppsExpected, RMApp app)
       throws YarnException {
     CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    UserGroupInformation ugi = UserGroupInformation
+        .createRemoteUser(app.getUser());
     cs.updateApplicationPriority(Priority.newInstance(2),
-        app.getApplicationId(), null);
+        app.getApplicationId(), null, ugi);
     SchedulerEvent removeAttempt;
     removeAttempt = new AppAttemptRemovedSchedulerEvent(
         app.getCurrentAppAttempt().getAppAttemptId(), RMAppAttemptState.KILLED,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLConfiguration.java
new file mode 100644
index 0000000..598bd49
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLConfiguration.java
@@ -0,0 +1,120 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestApplicationPriorityACLConfiguration {
+
+  private final int defaultPriorityQueueA = 3;
+  private final int defaultPriorityQueueB = -1;
+  private final int maxPriorityQueueA = 5;
+  private final int maxPriorityQueueB = 10;
+  private final int clusterMaxPriority = 10;
+
+  private static final String QUEUE_A_USER = "queueA_user";
+  private static final String QUEUE_B_USER = "queueB_user";
+  private static final String QUEUE_A_GROUP = "queueA_group";
+
+  private static final String QUEUEA = "queueA";
+  private static final String QUEUEB = "queueB";
+  private static final String QUEUEC = "queueC";
+
+  @Test
+  public void testSimpleACLConfiguration() throws Exception {
+    CapacitySchedulerConfiguration csConf = new 
CapacitySchedulerConfiguration();
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
+        new String[]{QUEUEA, QUEUEB, QUEUEC});
+
+    csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, 
50f);
+    csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, 
25f);
+    csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEC, 
25f);
+
+    // Success case: Configure one user/group level priority acl for queue A.
+    String[] aclsForA = new String[2];
+    aclsForA[0] = QUEUE_A_USER;
+    aclsForA[1] = QUEUE_A_GROUP;
+    csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA,
+        Priority.newInstance(maxPriorityQueueA),
+        Priority.newInstance(defaultPriorityQueueA), aclsForA);
+
+    // Try to get the ACL configs and make sure there are errors/exceptions
+    List<AppPriorityACLGroup> pGroupA = csConf.getPriorityAcls(
+        CapacitySchedulerConfiguration.ROOT + "." + QUEUEA,
+        Priority.newInstance(clusterMaxPriority));
+
+    // Validate!
+    verifyACLs(pGroupA, QUEUE_A_USER, QUEUE_A_GROUP, maxPriorityQueueA,
+        defaultPriorityQueueA);
+  }
+
+  @Test
+  public void testACLConfigurationForInvalidCases() throws Exception {
+    CapacitySchedulerConfiguration csConf = new 
CapacitySchedulerConfiguration();
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
+        new String[]{QUEUEA, QUEUEB, QUEUEC});
+
+    csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, 
50f);
+    csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, 
25f);
+    csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEC, 
25f);
+
+    // Success case: Configure one user/group level priority acl for queue A.
+    String[] aclsForA = new String[2];
+    aclsForA[0] = QUEUE_A_USER;
+    aclsForA[1] = QUEUE_A_GROUP;
+    csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA,
+        Priority.newInstance(maxPriorityQueueA),
+        Priority.newInstance(defaultPriorityQueueA), aclsForA);
+
+    String[] aclsForB = new String[1];
+    aclsForB[0] = QUEUE_B_USER;
+    csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB,
+        Priority.newInstance(maxPriorityQueueB),
+        Priority.newInstance(defaultPriorityQueueB), aclsForB);
+
+    // Try to get the ACL configs and make sure there are errors/exceptions
+    List<AppPriorityACLGroup> pGroupA = csConf.getPriorityAcls(
+        CapacitySchedulerConfiguration.ROOT + "." + QUEUEA,
+        Priority.newInstance(clusterMaxPriority));
+    List<AppPriorityACLGroup> pGroupB = csConf.getPriorityAcls(
+        CapacitySchedulerConfiguration.ROOT + "." + QUEUEB,
+        Priority.newInstance(clusterMaxPriority));
+
+    // Validate stored ACL values with configured ones.
+    verifyACLs(pGroupA, QUEUE_A_USER, QUEUE_A_GROUP, maxPriorityQueueA,
+        defaultPriorityQueueA);
+    verifyACLs(pGroupB, QUEUE_B_USER, "", maxPriorityQueueB, 0);
+  }
+
+  private void verifyACLs(List<AppPriorityACLGroup> pGroup, String queueUser,
+      String queueGroup, int maxPriority, int defaultPriority) {
+    AppPriorityACLGroup group = pGroup.get(0);
+    String aclString = queueUser + " " + queueGroup;
+
+    Assert.assertEquals(aclString.trim(),
+        group.getACLList().getAclString().trim());
+    Assert.assertEquals(maxPriority, group.getMaxPriority().getPriority());
+    Assert.assertEquals(defaultPriority,
+        group.getDefaultPriority().getPriority());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLs.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLs.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLs.java
new file mode 100644
index 0000000..b41ba83
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLs.java
@@ -0,0 +1,206 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.ACLsTestBase;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestApplicationPriorityACLs extends ACLsTestBase {
+
+  private final int defaultPriorityQueueA = 3;
+  private final int defaultPriorityQueueB = 10;
+  private final int maxPriorityQueueA = 5;
+  private final int maxPriorityQueueB = 11;
+  private final int clusterMaxPriority = 10;
+
+  @Test
+  public void testApplicationACLs() throws Exception {
+
+    /*
+     * Cluster Max-priority is 10. User 'queueA_user' has permission to submit
+     * apps only at priority 5. Default priority for this user is 3.
+     */
+
+    // Case 1: App will be submitted with priority 5.
+    verifyAppSubmitWithPrioritySuccess(QUEUE_A_USER, QUEUEA, 5);
+
+    // Case 2: App will be rejected as submitted priority was 6.
+    verifyAppSubmitWithPriorityFailure(QUEUE_A_USER, QUEUEA, 6);
+
+    // Case 3: App will be submitted w/o priority, hence consider default 3.
+    verifyAppSubmitWithPrioritySuccess(QUEUE_A_USER, QUEUEA, -1);
+
+    // Case 4: App will be submitted with priority 11.
+    verifyAppSubmitWithPrioritySuccess(QUEUE_B_USER, QUEUEB, 11);
+  }
+
+  private void verifyAppSubmitWithPrioritySuccess(String submitter,
+      String queueName, int priority) throws Exception {
+    Priority appPriority = null;
+    if (priority > 0) {
+      appPriority = Priority.newInstance(priority);
+    } else {
+      // RM will consider default priority for the submitted user. So update
+      // priority to the default value to compare.
+      priority = defaultPriorityQueueA;
+    }
+
+    ApplicationSubmissionContext submissionContext = prepareForAppSubmission(
+        submitter, queueName, appPriority);
+    submitAppToRMWithValidAcl(submitter, submissionContext);
+
+    // Ideally get app report here and check the priority.
+    verifyAppPriorityIsAccepted(submitter, 
submissionContext.getApplicationId(),
+        priority);
+  }
+
+  private void verifyAppSubmitWithPriorityFailure(String submitter,
+      String queueName, int priority) throws Exception {
+    Priority appPriority = Priority.newInstance(priority);
+    ApplicationSubmissionContext submissionContext = prepareForAppSubmission(
+        submitter, queueName, appPriority);
+    submitAppToRMWithInValidAcl(submitter, submissionContext);
+  }
+
+  private ApplicationSubmissionContext prepareForAppSubmission(String 
submitter,
+      String queueName, Priority priority) throws Exception {
+
+    GetNewApplicationRequest newAppRequest = GetNewApplicationRequest
+        .newInstance();
+
+    ApplicationClientProtocol submitterClient = getRMClientForUser(submitter);
+    ApplicationId applicationId = submitterClient
+        .getNewApplication(newAppRequest).getApplicationId();
+
+    Resource resource = BuilderUtils.newResource(1024, 1);
+
+    ContainerLaunchContext amContainerSpec = ContainerLaunchContext
+        .newInstance(null, null, null, null, null, null);
+    ApplicationSubmissionContext appSubmissionContext = 
ApplicationSubmissionContext
+        .newInstance(applicationId, "applicationName", queueName, null,
+            amContainerSpec, false, true, 1, resource, "applicationType");
+    appSubmissionContext.setApplicationId(applicationId);
+    appSubmissionContext.setQueue(queueName);
+    if (null != priority) {
+      appSubmissionContext.setPriority(priority);
+    }
+
+    return appSubmissionContext;
+  }
+
+  private void submitAppToRMWithValidAcl(String submitter,
+      ApplicationSubmissionContext appSubmissionContext)
+      throws YarnException, IOException, InterruptedException {
+    ApplicationClientProtocol submitterClient = getRMClientForUser(submitter);
+    SubmitApplicationRequest submitRequest = SubmitApplicationRequest
+        .newInstance(appSubmissionContext);
+    submitterClient.submitApplication(submitRequest);
+    resourceManager.waitForState(appSubmissionContext.getApplicationId(),
+        RMAppState.ACCEPTED);
+  }
+
+  private void submitAppToRMWithInValidAcl(String submitter,
+      ApplicationSubmissionContext appSubmissionContext)
+      throws YarnException, IOException, InterruptedException {
+    ApplicationClientProtocol submitterClient = getRMClientForUser(submitter);
+    SubmitApplicationRequest submitRequest = SubmitApplicationRequest
+        .newInstance(appSubmissionContext);
+    try {
+      submitterClient.submitApplication(submitRequest);
+    } catch (YarnException ex) {
+      Assert.assertTrue(ex.getCause() instanceof RemoteException);
+    }
+  }
+
+  private void verifyAppPriorityIsAccepted(String submitter,
+      ApplicationId applicationId, int priority)
+      throws IOException, InterruptedException {
+    ApplicationClientProtocol submitterClient = getRMClientForUser(submitter);
+
+    /**
+     * If priority is greater than cluster max, RM will auto set to cluster max
+     * Consider this scenario as a special case.
+     */
+    if (priority > clusterMaxPriority) {
+      priority = clusterMaxPriority;
+    }
+
+    GetApplicationReportRequest request = GetApplicationReportRequest
+        .newInstance(applicationId);
+    try {
+      GetApplicationReportResponse response = submitterClient
+          .getApplicationReport(request);
+      Assert.assertEquals(response.getApplicationReport().getPriority(),
+          Priority.newInstance(priority));
+    } catch (YarnException e) {
+      Assert.fail("Application submission should not fail.");
+    }
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    CapacitySchedulerConfiguration csConf = new 
CapacitySchedulerConfiguration();
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
+        new String[]{QUEUEA, QUEUEB, QUEUEC});
+
+    csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, 
50f);
+    csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, 
25f);
+    csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEC, 
25f);
+
+    String[] aclsForA = new String[2];
+    aclsForA[0] = QUEUE_A_USER;
+    aclsForA[1] = QUEUE_A_GROUP;
+    csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA,
+        Priority.newInstance(maxPriorityQueueA),
+        Priority.newInstance(defaultPriorityQueueA), aclsForA);
+
+    String[] aclsForB = new String[2];
+    aclsForB[0] = QUEUE_B_USER;
+    aclsForB[1] = QUEUE_B_GROUP;
+    csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB,
+        Priority.newInstance(maxPriorityQueueB),
+        Priority.newInstance(defaultPriorityQueueB), aclsForB);
+
+    csConf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
+    csConf.set(YarnConfiguration.RM_SCHEDULER,
+        CapacityScheduler.class.getName());
+
+    return csConf;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to