YARN-3955. Support for application priority ACLs in queues of CapacityScheduler. (Sunil G via wangda)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/287d3d68 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/287d3d68 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/287d3d68 Branch: refs/heads/HADOOP-13345 Commit: 287d3d6804a869723ae36605a3c2d2b3eae3941e Parents: db490ec Author: Wangda Tan <wan...@apache.org> Authored: Mon Jan 9 08:40:39 2017 -0800 Committer: Wangda Tan <wan...@apache.org> Committed: Mon Jan 9 08:40:39 2017 -0800 ---------------------------------------------------------------------- .../sls/scheduler/ResourceSchedulerWrapper.java | 2 +- .../apache/hadoop/yarn/security/AccessType.java | 2 + .../conf/capacity-scheduler.xml | 9 + .../server/resourcemanager/ClientRMService.java | 3 +- .../server/resourcemanager/RMAppManager.java | 16 +- .../scheduler/AbstractYarnScheduler.java | 10 +- .../scheduler/YarnScheduler.java | 11 +- .../AppPriorityACLConfigurationParser.java | 219 ++++++++++++++++++ .../scheduler/capacity/AppPriorityACLGroup.java | 108 +++++++++ .../scheduler/capacity/CapacityScheduler.java | 153 +++++++----- .../CapacitySchedulerConfiguration.java | 44 +++- .../capacity/CapacitySchedulerContext.java | 7 + .../capacity/CapacitySchedulerQueueManager.java | 23 +- .../scheduler/capacity/LeafQueue.java | 16 ++ .../security/AppPriorityACLsManager.java | 230 +++++++++++++++++++ .../server/resourcemanager/ACLsTestBase.java | 5 +- .../TestApplicationMasterService.java | 5 +- .../resourcemanager/TestClientRMService.java | 2 +- .../capacity/TestApplicationPriority.java | 25 +- ...TestApplicationPriorityACLConfiguration.java | 120 ++++++++++ .../capacity/TestApplicationPriorityACLs.java | 206 +++++++++++++++++ .../scheduler/capacity/TestParentQueue.java | 6 +- 22 files changed, 1127 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index e66de2f..5517362 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -963,7 +963,7 @@ final public class ResourceSchedulerWrapper @Override public Priority checkAndGetApplicationPriority(Priority priority, - String user, String queueName, ApplicationId applicationId) + UserGroupInformation user, String queueName, ApplicationId applicationId) throws YarnException { // TODO Dummy implementation. return Priority.newInstance(0); http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AccessType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AccessType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AccessType.java index 32459b9..fb4484b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AccessType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AccessType.java @@ -30,4 +30,6 @@ public enum AccessType { // queue SUBMIT_APP, ADMINISTER_QUEUE, + // application + APPLICATION_MAX_PRIORITY, } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml index 6ac726e..47db01f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml @@ -98,6 +98,15 @@ </property> <property> + <name>yarn.scheduler.capacity.root.default.acl_application_max_priority</name> + <value>*</value> + <description> + The ACL of who can submit applications with configured priority. + For e.g, [user={name} group={name} max_priority={priority} default_priority={priority}] + </description> + </property> + + <property> <name>yarn.scheduler.capacity.node-locality-delay</name> <value>40</value> <description> http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index cdf30a1..add522b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -1614,7 +1614,8 @@ public class ClientRMService extends AbstractService implements } try { - rmAppManager.updateApplicationPriority(applicationId, newAppPriority); + rmAppManager.updateApplicationPriority(callerUGI, applicationId, + newAppPriority); } catch (YarnException ex) { RMAuditLogger.logFailure(callerUGI.getShortUserName(), AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService", http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 8232b88..47eec54 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -353,19 +353,19 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>, RMServerUtils.validateApplicationTimeouts( submissionContext.getApplicationTimeouts()); } - + ApplicationId applicationId = submissionContext.getApplicationId(); ResourceRequest amReq = validateAndCreateResourceRequest(submissionContext, isRecovery); // Verify and get the update application priority and set back to // submissionContext + UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user); Priority appPriority = scheduler.checkAndGetApplicationPriority( - submissionContext.getPriority(), user, submissionContext.getQueue(), + submissionContext.getPriority(), userUgi, submissionContext.getQueue(), applicationId); submissionContext.setPriority(appPriority); - UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user); // Since FairScheduler queue mapping is done inside scheduler, // if FairScheduler is used and the queue doesn't exist, we should not // fail here because queue will be created inside FS. Ideally, FS queue @@ -569,12 +569,14 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>, /** * updateApplicationPriority will invoke scheduler api to update the * new priority to RM and StateStore. + * @param callerUGI user * @param applicationId Application Id * @param newAppPriority proposed new application priority * @throws YarnException Handle exceptions */ - public void updateApplicationPriority(ApplicationId applicationId, - Priority newAppPriority) throws YarnException { + public void updateApplicationPriority(UserGroupInformation callerUGI, + ApplicationId applicationId, Priority newAppPriority) + throws YarnException { RMApp app = this.rmContext.getRMApps().get(applicationId); synchronized (applicationId) { @@ -587,8 +589,8 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>, // Invoke scheduler api to update priority in scheduler and to // State Store. - Priority appPriority = rmContext.getScheduler() - .updateApplicationPriority(newAppPriority, applicationId, future); + Priority appPriority = rmContext.getScheduler().updateApplicationPriority( + newAppPriority, applicationId, future, callerUGI); if (app.getApplicationPriority().equals(appPriority)) { return; http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 89f9ffa..9ff0b24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -35,6 +35,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -788,9 +789,9 @@ public abstract class AbstractYarnScheduler } @Override - public Priority checkAndGetApplicationPriority(Priority priorityFromContext, - String user, String queueName, ApplicationId applicationId) - throws YarnException { + public Priority checkAndGetApplicationPriority( + Priority priorityRequestedByApp, UserGroupInformation user, + String queueName, ApplicationId applicationId) throws YarnException { // Dummy Implementation till Application Priority changes are done in // specific scheduler. return Priority.newInstance(0); @@ -798,7 +799,8 @@ public abstract class AbstractYarnScheduler @Override public Priority updateApplicationPriority(Priority newPriority, - ApplicationId applicationId, SettableFuture<Object> future) + ApplicationId applicationId, SettableFuture<Object> future, + UserGroupInformation user) throws YarnException { // Dummy Implementation till Application Priority changes are done in // specific scheduler. http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index 5579efd..08e0603 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -307,7 +307,7 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> { * Verify whether a submitted application priority is valid as per configured * Queue * - * @param priorityFromContext + * @param priorityRequestedByApp * Submitted Application priority. * @param user * User who submitted the Application @@ -317,8 +317,8 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> { * Application ID * @return Updated Priority from scheduler */ - public Priority checkAndGetApplicationPriority(Priority priorityFromContext, - String user, String queueName, ApplicationId applicationId) + public Priority checkAndGetApplicationPriority(Priority priorityRequestedByApp, + UserGroupInformation user, String queueName, ApplicationId applicationId) throws YarnException; /** @@ -330,12 +330,13 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> { * @param applicationId Application ID * * @param future Sets any type of exception happened from StateStore + * @param user who submitted the application * * @return updated priority */ public Priority updateApplicationPriority(Priority newPriority, - ApplicationId applicationId, SettableFuture<Object> future) - throws YarnException; + ApplicationId applicationId, SettableFuture<Object> future, + UserGroupInformation user) throws YarnException; /** * http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AppPriorityACLConfigurationParser.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AppPriorityACLConfigurationParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AppPriorityACLConfigurationParser.java new file mode 100644 index 0000000..4489a01 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AppPriorityACLConfigurationParser.java @@ -0,0 +1,219 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.Priority; + +/** + * + * PriorityACLConfiguration class is used to parse Application Priority ACL + * configuration from capcity-scheduler.xml + */ +public class AppPriorityACLConfigurationParser { + + private static final Log LOG = LogFactory + .getLog(AppPriorityACLConfigurationParser.class); + + public enum AppPriorityACLKeyType { + USER(1), GROUP(2), MAX_PRIORITY(3), DEFAULT_PRIORITY(4); + + private final int id; + + AppPriorityACLKeyType(int id) { + this.id = id; + } + + public int getId() { + return this.id; + } + } + + public static final String PATTERN_FOR_PRIORITY_ACL = "\\[([^\\]]+)"; + + @Private + public static final String ALL_ACL = "*"; + + @Private + public static final String NONE_ACL = " "; + + public List<AppPriorityACLGroup> getPriorityAcl(Priority clusterMaxPriority, + String aclString) { + + List<AppPriorityACLGroup> aclList = new ArrayList<AppPriorityACLGroup>(); + Matcher matcher = Pattern.compile(PATTERN_FOR_PRIORITY_ACL) + .matcher(aclString); + + /* + * Each ACL group will be separated by "[]". Syntax of each ACL group could + * be like below "user=b1,b2 group=g1 max-priority=a2 default-priority=a1" + * Ideally this means "for this given user/group, maximum possible priority + * is a2 and if the user has not specified any priority, then it is a1." + */ + while (matcher.find()) { + // Get the first ACL sub-group. + String aclSubGroup = matcher.group(1); + if (aclSubGroup.trim().isEmpty()) { + continue; + } + + /* + * Internal storage is PriorityACLGroup which stores each parsed priority + * ACLs group. This will help while looking for a user to priority mapping + * during app submission time. ACLs will be passed in below order only. 1. + * user/group 2. max-priority 3. default-priority + */ + AppPriorityACLGroup userPriorityACL = new AppPriorityACLGroup(); + + // userAndGroupName will hold user acl and group acl as interim storage + // since both user/group acl comes with separate key value pairs. + List<StringBuilder> userAndGroupName = new ArrayList<>(); + + for (String kvPair : aclSubGroup.trim().split(" +")) { + /* + * There are 3 possible options for key here: 1. user/group 2. + * max-priority 3. default-priority + */ + String[] splits = kvPair.split("="); + + // Ensure that each ACL sub string is key value pair separated by '='. + if (splits != null && splits.length > 1) { + parsePriorityACLType(userPriorityACL, splits, userAndGroupName); + } + } + + // If max_priority is higher to clusterMaxPriority, its better to + // handle here. + if (userPriorityACL.getMaxPriority().getPriority() > clusterMaxPriority + .getPriority()) { + LOG.warn("ACL configuration for '" + userPriorityACL.getMaxPriority() + + "' is greater that cluster max priority. Resetting ACLs to " + + clusterMaxPriority); + userPriorityACL.setMaxPriority( + Priority.newInstance(clusterMaxPriority.getPriority())); + } + + AccessControlList acl = createACLStringForPriority(userAndGroupName); + userPriorityACL.setACLList(acl); + aclList.add(userPriorityACL); + } + + return aclList; + } + + /* + * Parse different types of ACLs sub parts for on priority group and store in + * a map for later processing. + */ + private void parsePriorityACLType(AppPriorityACLGroup userPriorityACL, + String[] splits, List<StringBuilder> userAndGroupName) { + // Here splits will have the key value pair at index 0 and 1 respectively. + // To parse all keys, its better to convert to PriorityACLConfig enum. + AppPriorityACLKeyType aclType = AppPriorityACLKeyType + .valueOf(StringUtils.toUpperCase(splits[0].trim())); + switch (aclType) { + case MAX_PRIORITY : + userPriorityACL + .setMaxPriority(Priority.newInstance(Integer.parseInt(splits[1]))); + break; + case USER : + userAndGroupName.add(getUserOrGroupACLStringFromConfig(splits[1])); + break; + case GROUP : + userAndGroupName.add(getUserOrGroupACLStringFromConfig(splits[1])); + break; + case DEFAULT_PRIORITY : + int defaultPriority = Integer.parseInt(splits[1]); + Priority priority = (defaultPriority < 0) + ? Priority.newInstance(0) + : Priority.newInstance(defaultPriority); + userPriorityACL.setDefaultPriority(priority); + break; + default: + break; + } + } + + /* + * This method will help to append different types of ACLs keys against one + * priority. For eg,USER will be appended with GROUP as "user2,user4 group1". + */ + private AccessControlList createACLStringForPriority( + List<StringBuilder> acls) { + + String finalACL = ""; + String userACL = acls.get(0).toString(); + + // If any of user/group is *, consider it as acceptable for all. + // "user" is at index 0, and "group" is at index 1. + if (userACL.trim().equals(ALL_ACL)) { + finalACL = ALL_ACL; + } else if (userACL.equals(NONE_ACL)) { + finalACL = NONE_ACL; + } else { + + // Get USER segment + if (!userACL.trim().isEmpty()) { + // skip last appended "," + finalACL = acls.get(0).toString(); + } + + // Get GROUP segment if any + if (acls.size() > 1) { + String groupACL = acls.get(1).toString(); + if (!groupACL.trim().isEmpty()) { + finalACL = finalACL + " " + + acls.get(1).toString(); + } + } + } + + // Here ACL will look like "user1,user2 group" in ideal cases. + return new AccessControlList(finalACL.trim()); + } + + /* + * This method will help to append user/group acl string against given + * priority. For example "user1,user2 group1,group2" + */ + private StringBuilder getUserOrGroupACLStringFromConfig(String value) { + + // ACL strings could be generate for USER or GRUOP. + // aclList in map contains two entries. 1. USER, 2. GROUP. + StringBuilder aclTypeName = new StringBuilder(); + + if (value.trim().equals(ALL_ACL)) { + aclTypeName.setLength(0); + aclTypeName.append(ALL_ACL); + return aclTypeName; + } + + aclTypeName.append(value.trim()); + return aclTypeName; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AppPriorityACLGroup.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AppPriorityACLGroup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AppPriorityACLGroup.java new file mode 100644 index 0000000..cb5ebcb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AppPriorityACLGroup.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.api.records.Priority; + +/** + * PriorityACLGroup will hold all ACL related information per priority. + * + */ +public class AppPriorityACLGroup implements Comparable<AppPriorityACLGroup> { + + private Priority maxPriority = null; + private Priority defaultPriority = null; + private AccessControlList aclList = null; + + public AppPriorityACLGroup(Priority maxPriority, Priority defaultPriority, + AccessControlList aclList) { + this.setMaxPriority(Priority.newInstance(maxPriority.getPriority())); + this.setDefaultPriority( + Priority.newInstance(defaultPriority.getPriority())); + this.setACLList(aclList); + } + + public AppPriorityACLGroup() { + } + + @Override + public int compareTo(AppPriorityACLGroup o) { + return getMaxPriority().compareTo(o.getMaxPriority()); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null) { + return false; + } + + if (getClass() != obj.getClass()) { + return false; + } + + AppPriorityACLGroup other = (AppPriorityACLGroup) obj; + if (getMaxPriority() != other.getMaxPriority()) { + return false; + } + + if (getDefaultPriority() != other.getDefaultPriority()) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + final int prime = 517861; + int result = 9511; + result = prime * result + getMaxPriority().getPriority(); + result = prime * result + getDefaultPriority().getPriority(); + return result; + } + + public Priority getMaxPriority() { + return maxPriority; + } + + public Priority getDefaultPriority() { + return defaultPriority; + } + + public AccessControlList getACLList() { + return aclList; + } + + public void setMaxPriority(Priority maxPriority) { + this.maxPriority = maxPriority; + } + + public void setDefaultPriority(Priority defaultPriority) { + this.defaultPriority = defaultPriority; + } + + public void setACLList(AccessControlList accessControlList) { + this.aclList = accessControlList; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 55ffe25..c475967 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -137,6 +137,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -227,6 +228,7 @@ public class CapacityScheduler extends private List<AsyncScheduleThread> asyncSchedulerThreads; private ResourceCommitterService resourceCommitterService; private RMNodeLabelsManager labelManager; + private AppPriorityACLsManager appPriorityACLManager; /** * EXPERT @@ -308,8 +310,9 @@ public class CapacityScheduler extends this.usePortForNodeName = this.conf.getUsePortForNodeName(); this.applications = new ConcurrentHashMap<>(); this.labelManager = rmContext.getNodeLabelManager(); + this.appPriorityACLManager = new AppPriorityACLsManager(conf); this.queueManager = new CapacitySchedulerQueueManager(yarnConf, - this.labelManager); + this.labelManager, this.appPriorityACLManager); this.queueManager.setCapacitySchedulerContext(this); this.activitiesManager = new ActivitiesManager(rmContext); @@ -2188,86 +2191,110 @@ public class CapacityScheduler extends } @Override - public Priority checkAndGetApplicationPriority(Priority priorityFromContext, - String user, String queueName, ApplicationId applicationId) - throws YarnException { - Priority appPriority = null; - - // ToDo: Verify against priority ACLs - - // Verify the scenario where priority is null from submissionContext. - if (null == priorityFromContext) { - // Get the default priority for the Queue. If Queue is non-existent, then - // use default priority - priorityFromContext = this.queueManager.getDefaultPriorityForQueue( - queueName); + public Priority checkAndGetApplicationPriority( + Priority priorityRequestedByApp, UserGroupInformation user, + String queueName, ApplicationId applicationId) throws YarnException { + try { + readLock.lock(); + Priority appPriority = priorityRequestedByApp; + + // Verify the scenario where priority is null from submissionContext. + if (null == appPriority) { + // Verify whether submitted user has any default priority set. If so, + // user's default priority will get precedence over queue default. + // for updateApplicationPriority call flow, this check is done in + // CientRMService itself. + appPriority = this.appPriorityACLManager.getDefaultPriority(queueName, + user); + + // Get the default priority for the Queue. If Queue is non-existent, + // then + // use default priority. Do it only if user doesnt have any default. + if (null == appPriority) { + appPriority = this.queueManager.getDefaultPriorityForQueue(queueName); + } - LOG.info("Application '" + applicationId - + "' is submitted without priority " - + "hence considering default queue/cluster priority: " - + priorityFromContext.getPriority()); - } + LOG.info( + "Application '" + applicationId + "' is submitted without priority " + + "hence considering default queue/cluster priority: " + + appPriority.getPriority()); + } - // Verify whether submitted priority is lesser than max priority - // in the cluster. If it is out of found, defining a max cap. - if (priorityFromContext.compareTo(getMaxClusterLevelAppPriority()) < 0) { - priorityFromContext = Priority - .newInstance(getMaxClusterLevelAppPriority().getPriority()); - } + // Verify whether submitted priority is lesser than max priority + // in the cluster. If it is out of found, defining a max cap. + if (appPriority.getPriority() > getMaxClusterLevelAppPriority() + .getPriority()) { + appPriority = Priority + .newInstance(getMaxClusterLevelAppPriority().getPriority()); + } - appPriority = priorityFromContext; + // Lets check for ACLs here. + if (!appPriorityACLManager.checkAccess(user, queueName, appPriority)) { + throw new YarnException(new AccessControlException( + "User " + user + " does not have permission to submit/update " + + applicationId + " for " + appPriority)); + } - LOG.info("Priority '" + appPriority.getPriority() - + "' is acceptable in queue : " + queueName + " for application: " - + applicationId + " for the user: " + user); + LOG.info("Priority '" + appPriority.getPriority() + + "' is acceptable in queue : " + queueName + " for application: " + + applicationId); - return appPriority; + return appPriority; + } finally { + readLock.unlock(); + } } @Override public Priority updateApplicationPriority(Priority newPriority, - ApplicationId applicationId, SettableFuture<Object> future) + ApplicationId applicationId, SettableFuture<Object> future, + UserGroupInformation user) throws YarnException { - Priority appPriority = null; - SchedulerApplication<FiCaSchedulerApp> application = applications - .get(applicationId); + try { + writeLock.lock(); + Priority appPriority = null; + SchedulerApplication<FiCaSchedulerApp> application = applications + .get(applicationId); - if (application == null) { - throw new YarnException("Application '" + applicationId - + "' is not present, hence could not change priority."); - } + if (application == null) { + throw new YarnException("Application '" + applicationId + + "' is not present, hence could not change priority."); + } - RMApp rmApp = rmContext.getRMApps().get(applicationId); + RMApp rmApp = rmContext.getRMApps().get(applicationId); - appPriority = checkAndGetApplicationPriority(newPriority, rmApp.getUser(), - rmApp.getQueue(), applicationId); + appPriority = checkAndGetApplicationPriority(newPriority, user, + rmApp.getQueue(), applicationId); - if (application.getPriority().equals(appPriority)) { - future.set(null); - return appPriority; - } + if (application.getPriority().equals(appPriority)) { + future.set(null); + return appPriority; + } - // Update new priority in Submission Context to update to StateStore. - rmApp.getApplicationSubmissionContext().setPriority(appPriority); + // Update new priority in Submission Context to update to StateStore. + rmApp.getApplicationSubmissionContext().setPriority(appPriority); - // Update to state store - ApplicationStateData appState = ApplicationStateData.newInstance( - rmApp.getSubmitTime(), rmApp.getStartTime(), - rmApp.getApplicationSubmissionContext(), rmApp.getUser(), - rmApp.getCallerContext()); - appState.setApplicationTimeouts(rmApp.getApplicationTimeouts()); - rmContext.getStateStore().updateApplicationStateSynchronously(appState, - false, future); + // Update to state store + ApplicationStateData appState = ApplicationStateData.newInstance( + rmApp.getSubmitTime(), rmApp.getStartTime(), + rmApp.getApplicationSubmissionContext(), rmApp.getUser(), + rmApp.getCallerContext()); + appState.setApplicationTimeouts(rmApp.getApplicationTimeouts()); + rmContext.getStateStore().updateApplicationStateSynchronously(appState, + false, future); - // As we use iterator over a TreeSet for OrderingPolicy, once we change - // priority then reinsert back to make order correct. - LeafQueue queue = (LeafQueue) getQueue(rmApp.getQueue()); - queue.updateApplicationPriority(application, appPriority); + // As we use iterator over a TreeSet for OrderingPolicy, once we change + // priority then reinsert back to make order correct. + LeafQueue queue = (LeafQueue) getQueue(rmApp.getQueue()); + queue.updateApplicationPriority(application, appPriority); - LOG.info("Priority '" + appPriority + "' is updated in queue :" - + rmApp.getQueue() + " for application: " + applicationId - + " for the user: " + rmApp.getUser()); - return appPriority; + LOG.info("Priority '" + appPriority + "' is updated in queue :" + + rmApp.getQueue() + " for application: " + applicationId + + " for the user: " + rmApp.getUser()); + return appPriority; + } finally { + writeLock.unlock(); + } } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index bfaeba4..eb148d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.ReservationACL; @@ -49,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AppPriorityACLConfigurationParser.AppPriorityACLKeyType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; @@ -63,7 +65,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur private static final Log LOG = LogFactory.getLog(CapacitySchedulerConfiguration.class); - + private static final String CS_CONFIGURATION_FILE = "capacity-scheduler.xml"; @Private @@ -274,6 +276,8 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur @Private public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false; + AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser(); + public CapacitySchedulerConfiguration() { this(new Configuration()); } @@ -602,6 +606,10 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur return "acl_" + StringUtils.toLowerCase(acl.toString()); } + private static String getAclKey(AccessType acl) { + return "acl_" + StringUtils.toLowerCase(acl.toString()); + } + @Override public Map<ReservationACL, AccessControlList> getReservationAcls(String queue) { @@ -627,6 +635,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur set(queuePrefix + getAclKey(acl), aclString); } + private void setAcl(String queue, AccessType acl, String aclString) { + String queuePrefix = getQueuePrefix(queue); + set(queuePrefix + getAclKey(acl), aclString); + } + public Map<AccessType, AccessControlList> getAcls(String queue) { Map<AccessType, AccessControlList> acls = new HashMap<AccessType, AccessControlList>(); @@ -650,6 +663,35 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur } } + @VisibleForTesting + public void setPriorityAcls(String queue, Priority priority, + Priority defaultPriority, String[] acls) { + StringBuilder aclString = new StringBuilder(); + + StringBuilder userAndGroup = new StringBuilder(); + for (int i = 0; i < acls.length; i++) { + userAndGroup.append(AppPriorityACLKeyType.values()[i] + "=" + acls[i].trim()) + .append(" "); + } + + aclString.append("[" + userAndGroup.toString().trim() + " " + + "max_priority=" + priority.getPriority() + " " + "default_priority=" + + defaultPriority.getPriority() + "]"); + + setAcl(queue, AccessType.APPLICATION_MAX_PRIORITY, aclString.toString()); + } + + public List<AppPriorityACLGroup> getPriorityAcls(String queue, + Priority clusterMaxPriority) { + String queuePrefix = getQueuePrefix(queue); + String defaultAcl = ALL_ACL; + String aclString = get( + queuePrefix + getAclKey(AccessType.APPLICATION_MAX_PRIORITY), + defaultAcl); + + return priorityACLConfig.getPriorityAcl(clusterMaxPriority, aclString); + } + public String[] getQueues(String queue) { LOG.debug("CSConf - getQueues called for: queuePrefix=" + getQueuePrefix(queue)); String[] queues = getStrings(getQueuePrefix(queue) + QUEUES); http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index 7d29619..504acb9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -23,6 +23,7 @@ import java.util.Comparator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; @@ -85,4 +86,10 @@ public interface CapacitySchedulerContext { ActivitiesManager getActivitiesManager(); CapacitySchedulerQueueManager getCapacitySchedulerQueueManager(); + + /** + * + * @return Max Cluster level App priority. + */ + Priority getMaxClusterLevelAppPriority(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index 6a3c08a..ddcbc0e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; /** * @@ -86,6 +87,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< private final Map<String, CSQueue> queues = new ConcurrentHashMap<>(); private CSQueue root; private final RMNodeLabelsManager labelManager; + private AppPriorityACLsManager appPriorityACLManager; private QueueStateManager<CSQueue, CapacitySchedulerConfiguration> queueStateManager; @@ -94,12 +96,15 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< * Construct the service. * @param conf the configuration * @param labelManager the labelManager + * @param appPriorityACLManager App priority ACL manager */ public CapacitySchedulerQueueManager(Configuration conf, - RMNodeLabelsManager labelManager) { + RMNodeLabelsManager labelManager, + AppPriorityACLsManager appPriorityACLManager) { this.authorizer = YarnAuthorizationProvider.getInstance(conf); this.labelManager = labelManager; this.queueStateManager = new QueueStateManager<>(); + this.appPriorityACLManager = appPriorityACLManager; } @Override @@ -145,7 +150,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< throws IOException { root = parseQueue(this.csContext, conf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP); - setQueueAcls(authorizer, queues); + setQueueAcls(authorizer, appPriorityACLManager, queues); labelManager.reinitializeQueueLabels(getQueueToLabels()); this.queueStateManager.initialize(this); LOG.info("Initialized root queue " + root); @@ -168,7 +173,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< // Re-configure queues root.reinitialize(newRoot, this.csContext.getClusterResource()); - setQueueAcls(authorizer, queues); + setQueueAcls(authorizer, appPriorityACLManager, queues); // Re-calculate headroom for active applications Resource clusterResource = this.csContext.getClusterResource(); @@ -305,12 +310,22 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< * @throws IOException if fails to set queue acls */ public static void setQueueAcls(YarnAuthorizationProvider authorizer, - Map<String, CSQueue> queues) throws IOException { + AppPriorityACLsManager appPriorityACLManager, Map<String, CSQueue> queues) + throws IOException { List<Permission> permissions = new ArrayList<>(); for (CSQueue queue : queues.values()) { AbstractCSQueue csQueue = (AbstractCSQueue) queue; permissions.add( new Permission(csQueue.getPrivilegedEntity(), csQueue.getACLs())); + + if (queue instanceof LeafQueue) { + LeafQueue lQueue = (LeafQueue) queue; + + // Clear Priority ACLs first since reinitialize also call same. + appPriorityACLManager.clearPriorityACLs(lQueue.getQueueName()); + appPriorityACLManager.addPrioirityACLs(lQueue.getPriorityACLs(), + lQueue.getQueueName()); + } } authorizer.setPermission(permissions, UserGroupInformation.getCurrentUser()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 18b38f4..6d4dbe0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -140,6 +140,9 @@ public class LeafQueue extends AbstractCSQueue { private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers = new ConcurrentHashMap<>(); + List<AppPriorityACLGroup> priorityAcls = + new ArrayList<AppPriorityACLGroup>(); + @SuppressWarnings({ "unchecked", "rawtypes" }) public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { @@ -205,6 +208,9 @@ public class LeafQueue extends AbstractCSQueue { conf.getMaximumApplicationMasterResourcePerQueuePercent( getQueuePath()); + priorityAcls = conf.getPriorityAcls(getQueuePath(), + scheduler.getMaxClusterLevelAppPriority()); + if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels, this.defaultLabelExpression, null)) { throw new IOException( @@ -497,6 +503,16 @@ public class LeafQueue extends AbstractCSQueue { } } + @Private + public List<AppPriorityACLGroup> getPriorityACLs() { + try { + readLock.lock(); + return new ArrayList<>(priorityAcls); + } finally { + readLock.unlock(); + } + } + @Override public void reinitialize( CSQueue newlyParsedQueue, Resource clusterResource) http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AppPriorityACLsManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AppPriorityACLsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AppPriorityACLsManager.java new file mode 100644 index 0000000..c1fd0a6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AppPriorityACLsManager.java @@ -0,0 +1,230 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.security; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AppPriorityACLGroup; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * + * Manager class to store and check permission for Priority ACLs. + */ +public class AppPriorityACLsManager { + + private static final Log LOG = LogFactory + .getLog(AppPriorityACLsManager.class); + + /* + * An internal class to store ACLs specific to each priority. This will be + * used to read and process acl's during app submission time as well. + */ + private static class PriorityACL { + private Priority priority; + private Priority defaultPriority; + private AccessControlList acl; + + PriorityACL(Priority priority, Priority defaultPriority, + AccessControlList acl) { + this.setPriority(priority); + this.setDefaultPriority(defaultPriority); + this.setAcl(acl); + } + + public Priority getPriority() { + return priority; + } + + public void setPriority(Priority maxPriority) { + this.priority = maxPriority; + } + + public Priority getDefaultPriority() { + return defaultPriority; + } + + public void setDefaultPriority(Priority defaultPriority) { + this.defaultPriority = defaultPriority; + } + + public AccessControlList getAcl() { + return acl; + } + + public void setAcl(AccessControlList acl) { + this.acl = acl; + } + } + + private boolean isACLsEnable; + private final ConcurrentMap<String, List<PriorityACL>> allAcls = + new ConcurrentHashMap<>(); + + public AppPriorityACLsManager(Configuration conf) { + this.isACLsEnable = conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE, + YarnConfiguration.DEFAULT_YARN_ACL_ENABLE); + } + + /** + * Clear priority acl during refresh. + * + * @param queueName + * Queue Name + */ + public void clearPriorityACLs(String queueName) { + allAcls.remove(queueName); + } + + /** + * Each Queue could have configured with different priority acl's groups. This + * method helps to store each such ACL list against queue. + * + * @param priorityACLGroups + * List of Priority ACL Groups. + * @param queueName + * Queue Name associate with priority acl groups. + */ + public void addPrioirityACLs(List<AppPriorityACLGroup> priorityACLGroups, + String queueName) { + + List<PriorityACL> priorityACL = allAcls.get(queueName); + if (null == priorityACL) { + priorityACL = new ArrayList<PriorityACL>(); + allAcls.put(queueName, priorityACL); + } + + // Ensure lowest priority PriorityACLGroup comes first in the list. + Collections.sort(priorityACLGroups); + + for (AppPriorityACLGroup priorityACLGroup : priorityACLGroups) { + priorityACL.add(new PriorityACL(priorityACLGroup.getMaxPriority(), + priorityACLGroup.getDefaultPriority(), + priorityACLGroup.getACLList())); + if (LOG.isDebugEnabled()) { + LOG.debug("Priority ACL group added: max-priority - " + + priorityACLGroup.getMaxPriority() + "default-priority - " + + priorityACLGroup.getDefaultPriority()); + } + } + } + + /** + * Priority based checkAccess to ensure that given user has enough permission + * to submit application at a given priority level. + * + * @param callerUGI + * User who submits the application. + * @param queueName + * Queue to which application is submitted. + * @param submittedPriority + * priority of the application. + * @return True or False to indicate whether application can be submitted at + * submitted priority level or not. + */ + public boolean checkAccess(UserGroupInformation callerUGI, String queueName, + Priority submittedPriority) { + if (!isACLsEnable) { + return true; + } + + List<PriorityACL> acls = allAcls.get(queueName); + if (acls == null || acls.isEmpty()) { + return true; + } + + PriorityACL approvedPriorityACL = getMappedPriorityAclForUGI(acls, + callerUGI, submittedPriority); + if (approvedPriorityACL == null) { + return false; + } + + return true; + } + + /** + * If an application is submitted without any priority, and submitted user has + * a default priority, this method helps to update this default priority as + * app's priority. + * + * @param queueName + * Submitted queue + * @param user + * User who submitted this application + * @return Default priority associated with given user. + */ + public Priority getDefaultPriority(String queueName, + UserGroupInformation user) { + if (!isACLsEnable) { + return null; + } + + List<PriorityACL> acls = allAcls.get(queueName); + if (acls == null || acls.isEmpty()) { + return null; + } + + PriorityACL approvedPriorityACL = getMappedPriorityAclForUGI(acls, user, + null); + if (approvedPriorityACL == null) { + return null; + } + + Priority defaultPriority = Priority + .newInstance(approvedPriorityACL.getDefaultPriority().getPriority()); + return defaultPriority; + } + + private PriorityACL getMappedPriorityAclForUGI(List<PriorityACL> acls , + UserGroupInformation user, Priority submittedPriority) { + + // Iterate through all configured ACLs starting from lower priority. + // If user is found corresponding to a configured priority, then store + // that entry. if failed, continue iterate through whole acl list. + PriorityACL selectedAcl = null; + for (PriorityACL entry : acls) { + AccessControlList list = entry.getAcl(); + + if (list.isUserAllowed(user)) { + selectedAcl = entry; + + // If submittedPriority is passed through the argument, also check + // whether submittedPriority is under max-priority of each ACL group. + if (submittedPriority != null) { + selectedAcl = null; + if (submittedPriority.getPriority() <= entry.getPriority() + .getPriority()) { + return entry; + } + } + } + } + return selectedAcl; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java index e661703..fbd5ac3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java @@ -43,6 +43,8 @@ public abstract class ACLsTestBase { protected static final String COMMON_USER = "common_user"; protected static final String QUEUE_A_USER = "queueA_user"; protected static final String QUEUE_B_USER = "queueB_user"; + protected static final String QUEUE_A_GROUP = "queueA_group"; + protected static final String QUEUE_B_GROUP = "queueB_group"; protected static final String ROOT_ADMIN = "root_admin"; protected static final String QUEUE_A_ADMIN = "queueA_admin"; protected static final String QUEUE_B_ADMIN = "queueB_admin"; @@ -53,7 +55,7 @@ public abstract class ACLsTestBase { protected static final Log LOG = LogFactory.getLog(TestApplicationACLs.class); - MockRM resourceManager; + protected MockRM resourceManager; Configuration conf; YarnRPC rpc; InetSocketAddress rmAddress; @@ -68,6 +70,7 @@ public abstract class ACLsTestBase { AccessControlList adminACL = new AccessControlList(""); conf.set(YarnConfiguration.YARN_ADMIN_ACL, adminACL.getAclString()); + conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); resourceManager = new MockRM(conf) { protected ClientRMService createClientRMService() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index 00466ae..23bed22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -29,6 +29,7 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; @@ -463,7 +464,9 @@ public class TestApplicationMasterService { // Change the priority of App1 to 8 Priority appPriority2 = Priority.newInstance(8); - rm.getRMAppManager().updateApplicationPriority(app1.getApplicationId(), + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser(app1.getUser()); + rm.getRMAppManager().updateApplicationPriority(ugi, app1.getApplicationId(), appPriority2); AllocateResponse response2 = am1.allocate(allocateRequest); http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index c135384..5ecba12 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -1123,7 +1123,7 @@ public class TestClientRMService { when(yarnScheduler.getResourceCalculator()).thenReturn(rs); when(yarnScheduler.checkAndGetApplicationPriority(any(Priority.class), - anyString(), anyString(), any(ApplicationId.class))) + any(UserGroupInformation.class), anyString(), any(ApplicationId.class))) .thenReturn(Priority.newInstance(0)); return yarnScheduler; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java index 164ca20..ff52efd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -344,7 +345,10 @@ public class TestApplicationPriority { // Change the priority of App1 to 8 Priority appPriority2 = Priority.newInstance(8); - cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null); + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser(app1.getUser()); + cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null, + ugi); // get scheduler app FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications() @@ -378,7 +382,10 @@ public class TestApplicationPriority { // Change the priority of App1 to 15 Priority appPriority2 = Priority.newInstance(15); - cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null); + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser(app1.getUser()); + cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null, + ugi); // get scheduler app FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications() @@ -428,7 +435,10 @@ public class TestApplicationPriority { // Change the priority of App1 to 8 Priority appPriority2 = Priority.newInstance(8); - cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null); + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser(app1.getUser()); + cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null, + ugi); // let things settle down Thread.sleep(1000); @@ -557,7 +567,10 @@ public class TestApplicationPriority { // Change the priority of App1 to 3 (lowest) Priority appPriority3 = Priority.newInstance(3); - cs.updateApplicationPriority(appPriority3, app2.getApplicationId(), null); + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser(app2.getUser()); + cs.updateApplicationPriority(appPriority3, app2.getApplicationId(), null, + ugi); // add request for containers App2 am2.allocate("127.0.0.1", 2 * GB, 3, new ArrayList<ContainerId>()); @@ -788,8 +801,10 @@ public class TestApplicationPriority { int appsPendingExpected, int activeAppsExpected, RMApp app) throws YarnException { CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser(app.getUser()); cs.updateApplicationPriority(Priority.newInstance(2), - app.getApplicationId(), null); + app.getApplicationId(), null, ugi); SchedulerEvent removeAttempt; removeAttempt = new AppAttemptRemovedSchedulerEvent( app.getCurrentAppAttempt().getAppAttemptId(), RMAppAttemptState.KILLED, http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLConfiguration.java new file mode 100644 index 0000000..598bd49 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLConfiguration.java @@ -0,0 +1,120 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.util.List; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.junit.Assert; +import org.junit.Test; + + +public class TestApplicationPriorityACLConfiguration { + + private final int defaultPriorityQueueA = 3; + private final int defaultPriorityQueueB = -1; + private final int maxPriorityQueueA = 5; + private final int maxPriorityQueueB = 10; + private final int clusterMaxPriority = 10; + + private static final String QUEUE_A_USER = "queueA_user"; + private static final String QUEUE_B_USER = "queueB_user"; + private static final String QUEUE_A_GROUP = "queueA_group"; + + private static final String QUEUEA = "queueA"; + private static final String QUEUEB = "queueB"; + private static final String QUEUEC = "queueC"; + + @Test + public void testSimpleACLConfiguration() throws Exception { + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{QUEUEA, QUEUEB, QUEUEC}); + + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, 50f); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, 25f); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEC, 25f); + + // Success case: Configure one user/group level priority acl for queue A. + String[] aclsForA = new String[2]; + aclsForA[0] = QUEUE_A_USER; + aclsForA[1] = QUEUE_A_GROUP; + csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, + Priority.newInstance(maxPriorityQueueA), + Priority.newInstance(defaultPriorityQueueA), aclsForA); + + // Try to get the ACL configs and make sure there are errors/exceptions + List<AppPriorityACLGroup> pGroupA = csConf.getPriorityAcls( + CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, + Priority.newInstance(clusterMaxPriority)); + + // Validate! + verifyACLs(pGroupA, QUEUE_A_USER, QUEUE_A_GROUP, maxPriorityQueueA, + defaultPriorityQueueA); + } + + @Test + public void testACLConfigurationForInvalidCases() throws Exception { + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{QUEUEA, QUEUEB, QUEUEC}); + + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, 50f); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, 25f); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEC, 25f); + + // Success case: Configure one user/group level priority acl for queue A. + String[] aclsForA = new String[2]; + aclsForA[0] = QUEUE_A_USER; + aclsForA[1] = QUEUE_A_GROUP; + csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, + Priority.newInstance(maxPriorityQueueA), + Priority.newInstance(defaultPriorityQueueA), aclsForA); + + String[] aclsForB = new String[1]; + aclsForB[0] = QUEUE_B_USER; + csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, + Priority.newInstance(maxPriorityQueueB), + Priority.newInstance(defaultPriorityQueueB), aclsForB); + + // Try to get the ACL configs and make sure there are errors/exceptions + List<AppPriorityACLGroup> pGroupA = csConf.getPriorityAcls( + CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, + Priority.newInstance(clusterMaxPriority)); + List<AppPriorityACLGroup> pGroupB = csConf.getPriorityAcls( + CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, + Priority.newInstance(clusterMaxPriority)); + + // Validate stored ACL values with configured ones. + verifyACLs(pGroupA, QUEUE_A_USER, QUEUE_A_GROUP, maxPriorityQueueA, + defaultPriorityQueueA); + verifyACLs(pGroupB, QUEUE_B_USER, "", maxPriorityQueueB, 0); + } + + private void verifyACLs(List<AppPriorityACLGroup> pGroup, String queueUser, + String queueGroup, int maxPriority, int defaultPriority) { + AppPriorityACLGroup group = pGroup.get(0); + String aclString = queueUser + " " + queueGroup; + + Assert.assertEquals(aclString.trim(), + group.getACLList().getAclString().trim()); + Assert.assertEquals(maxPriority, group.getMaxPriority().getPriority()); + Assert.assertEquals(defaultPriority, + group.getDefaultPriority().getPriority()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/287d3d68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLs.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLs.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLs.java new file mode 100644 index 0000000..b41ba83 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLs.java @@ -0,0 +1,206 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.ACLsTestBase; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.Assert; +import org.junit.Test; + + +public class TestApplicationPriorityACLs extends ACLsTestBase { + + private final int defaultPriorityQueueA = 3; + private final int defaultPriorityQueueB = 10; + private final int maxPriorityQueueA = 5; + private final int maxPriorityQueueB = 11; + private final int clusterMaxPriority = 10; + + @Test + public void testApplicationACLs() throws Exception { + + /* + * Cluster Max-priority is 10. User 'queueA_user' has permission to submit + * apps only at priority 5. Default priority for this user is 3. + */ + + // Case 1: App will be submitted with priority 5. + verifyAppSubmitWithPrioritySuccess(QUEUE_A_USER, QUEUEA, 5); + + // Case 2: App will be rejected as submitted priority was 6. + verifyAppSubmitWithPriorityFailure(QUEUE_A_USER, QUEUEA, 6); + + // Case 3: App will be submitted w/o priority, hence consider default 3. + verifyAppSubmitWithPrioritySuccess(QUEUE_A_USER, QUEUEA, -1); + + // Case 4: App will be submitted with priority 11. + verifyAppSubmitWithPrioritySuccess(QUEUE_B_USER, QUEUEB, 11); + } + + private void verifyAppSubmitWithPrioritySuccess(String submitter, + String queueName, int priority) throws Exception { + Priority appPriority = null; + if (priority > 0) { + appPriority = Priority.newInstance(priority); + } else { + // RM will consider default priority for the submitted user. So update + // priority to the default value to compare. + priority = defaultPriorityQueueA; + } + + ApplicationSubmissionContext submissionContext = prepareForAppSubmission( + submitter, queueName, appPriority); + submitAppToRMWithValidAcl(submitter, submissionContext); + + // Ideally get app report here and check the priority. + verifyAppPriorityIsAccepted(submitter, submissionContext.getApplicationId(), + priority); + } + + private void verifyAppSubmitWithPriorityFailure(String submitter, + String queueName, int priority) throws Exception { + Priority appPriority = Priority.newInstance(priority); + ApplicationSubmissionContext submissionContext = prepareForAppSubmission( + submitter, queueName, appPriority); + submitAppToRMWithInValidAcl(submitter, submissionContext); + } + + private ApplicationSubmissionContext prepareForAppSubmission(String submitter, + String queueName, Priority priority) throws Exception { + + GetNewApplicationRequest newAppRequest = GetNewApplicationRequest + .newInstance(); + + ApplicationClientProtocol submitterClient = getRMClientForUser(submitter); + ApplicationId applicationId = submitterClient + .getNewApplication(newAppRequest).getApplicationId(); + + Resource resource = BuilderUtils.newResource(1024, 1); + + ContainerLaunchContext amContainerSpec = ContainerLaunchContext + .newInstance(null, null, null, null, null, null); + ApplicationSubmissionContext appSubmissionContext = ApplicationSubmissionContext + .newInstance(applicationId, "applicationName", queueName, null, + amContainerSpec, false, true, 1, resource, "applicationType"); + appSubmissionContext.setApplicationId(applicationId); + appSubmissionContext.setQueue(queueName); + if (null != priority) { + appSubmissionContext.setPriority(priority); + } + + return appSubmissionContext; + } + + private void submitAppToRMWithValidAcl(String submitter, + ApplicationSubmissionContext appSubmissionContext) + throws YarnException, IOException, InterruptedException { + ApplicationClientProtocol submitterClient = getRMClientForUser(submitter); + SubmitApplicationRequest submitRequest = SubmitApplicationRequest + .newInstance(appSubmissionContext); + submitterClient.submitApplication(submitRequest); + resourceManager.waitForState(appSubmissionContext.getApplicationId(), + RMAppState.ACCEPTED); + } + + private void submitAppToRMWithInValidAcl(String submitter, + ApplicationSubmissionContext appSubmissionContext) + throws YarnException, IOException, InterruptedException { + ApplicationClientProtocol submitterClient = getRMClientForUser(submitter); + SubmitApplicationRequest submitRequest = SubmitApplicationRequest + .newInstance(appSubmissionContext); + try { + submitterClient.submitApplication(submitRequest); + } catch (YarnException ex) { + Assert.assertTrue(ex.getCause() instanceof RemoteException); + } + } + + private void verifyAppPriorityIsAccepted(String submitter, + ApplicationId applicationId, int priority) + throws IOException, InterruptedException { + ApplicationClientProtocol submitterClient = getRMClientForUser(submitter); + + /** + * If priority is greater than cluster max, RM will auto set to cluster max + * Consider this scenario as a special case. + */ + if (priority > clusterMaxPriority) { + priority = clusterMaxPriority; + } + + GetApplicationReportRequest request = GetApplicationReportRequest + .newInstance(applicationId); + try { + GetApplicationReportResponse response = submitterClient + .getApplicationReport(request); + Assert.assertEquals(response.getApplicationReport().getPriority(), + Priority.newInstance(priority)); + } catch (YarnException e) { + Assert.fail("Application submission should not fail."); + } + } + + @Override + protected Configuration createConfiguration() { + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{QUEUEA, QUEUEB, QUEUEC}); + + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, 50f); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, 25f); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEC, 25f); + + String[] aclsForA = new String[2]; + aclsForA[0] = QUEUE_A_USER; + aclsForA[1] = QUEUE_A_GROUP; + csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, + Priority.newInstance(maxPriorityQueueA), + Priority.newInstance(defaultPriorityQueueA), aclsForA); + + String[] aclsForB = new String[2]; + aclsForB[0] = QUEUE_B_USER; + aclsForB[1] = QUEUE_B_GROUP; + csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, + Priority.newInstance(maxPriorityQueueB), + Priority.newInstance(defaultPriorityQueueB), aclsForB); + + csConf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); + csConf.set(YarnConfiguration.RM_SCHEDULER, + CapacityScheduler.class.getName()); + + return csConf; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org