This is an automated email from the ASF dual-hosted git repository. snemeth pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 32ecaed YARN-10930. Introduce universal capacity resource vector. Contributed by Andras Gyori 32ecaed is described below commit 32ecaed9c3c06a48ef01d0437e62e8faccd3e9f3 Author: 9uapaw <gyora...@gmail.com> AuthorDate: Fri Oct 22 17:32:33 2021 +0200 YARN-10930. Introduce universal capacity resource vector. Contributed by Andras Gyori --- .../scheduler/capacity/AbstractCSQueue.java | 12 +- .../scheduler/capacity/CSQueue.java | 8 + .../capacity/CapacitySchedulerConfiguration.java | 19 +- .../scheduler/capacity/QueueCapacityVector.java | 258 +++++++++++++++++++++ .../scheduler/capacity/ResourceVector.java | 129 +++++++++++ .../capacity/conf/QueueCapacityConfigParser.java | 215 +++++++++++++++++ .../capacity/TestQueueCapacityVector.java | 111 +++++++++ .../scheduler/capacity/TestResourceVector.java | 118 ++++++++++ .../conf/TestQueueCapacityConfigParser.java | 241 +++++++++++++++++++ 9 files changed, 1108 insertions(+), 3 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 2d9bf85..e3feb51 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -131,6 +131,8 @@ public abstract class AbstractCSQueue implements CSQueue { protected CapacityConfigType capacityConfigType = CapacityConfigType.NONE; + protected Map<String, QueueCapacityVector> configuredCapacityVectors; + private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); protected CapacitySchedulerContext csContext; @@ -374,6 +376,8 @@ public abstract class AbstractCSQueue implements CSQueue { this.reservationsContinueLooking = configuration.getReservationContinueLook(); + this.configuredCapacityVectors = csContext.getConfiguration() + .parseConfiguredResourceVector(queuePath, configuredNodeLabels); // Update metrics CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, @@ -688,6 +692,12 @@ public abstract class AbstractCSQueue implements CSQueue { minimumAllocation); } + @Override + public QueueCapacityVector getConfiguredCapacityVector( + String label) { + return configuredCapacityVectors.get(label); + } + private void initializeQueueState(CapacitySchedulerConfiguration configuration) { QueueState previousState = getState(); QueueState configuredState = configuration @@ -978,7 +988,7 @@ public abstract class AbstractCSQueue implements CSQueue { "Default lifetime " + defaultAppLifetime + " can't exceed maximum lifetime " + myMaxAppLifetime); } - + if (defaultAppLifetime <= 0) { defaultAppLifetime = myMaxAppLifetime; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 03a5afb..2acc1d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -420,6 +420,14 @@ public interface CSQueue extends SchedulerQueue<CSQueue> { Resource getEffectiveCapacity(String label); /** + * Get configured capacity resource vector parsed from the capacity config + * of the queue. + * @param label node label (partition) + * @return capacity resource vector + */ + QueueCapacityVector getConfiguredCapacityVector(String label); + + /** * Get effective capacity of queue. If min/max resource is configured, * preference will be given to absolute configuration over normal capacity. * Also round down the result to normalizeDown. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index e7b1cbd..615a4d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Strings; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.hadoop.yarn.server.resourcemanager.placement.csmappingrule.MappingRule; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.QueueCapacityConfigParser; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.MappingRuleCreator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,9 +74,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.Set; public class CapacitySchedulerConfiguration extends ReservationSchedulerConfiguration { @@ -413,6 +414,10 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur public static final String MAPPING_RULE_FORMAT_DEFAULT = MAPPING_RULE_FORMAT_LEGACY; + + private static final QueueCapacityConfigParser queueCapacityConfigParser + = new QueueCapacityConfigParser(); + private ConfigurationProperties configurationProperties; /** @@ -454,7 +459,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur return PREFIX + "user." + user + DOT; } - private String getNodeLabelPrefix(String queue, String label) { + public static String getNodeLabelPrefix(String queue, String label) { if (label.equals(CommonNodeLabelsManager.NO_LABEL)) { return getQueuePrefix(queue); } @@ -2571,6 +2576,16 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur updateMinMaxResourceToConf(label, queue, resource, MAXIMUM_CAPACITY); } + public Map<String, QueueCapacityVector> parseConfiguredResourceVector( + String queuePath, Set<String> labels) { + Map<String, QueueCapacityVector> queueResourceVectors = new HashMap<>(); + for (String label : labels) { + queueResourceVectors.put(label, queueCapacityConfigParser.parse(this, queuePath, label)); + } + + return queueResourceVectors; + } + private void updateMinMaxResourceToConf(String label, String queue, Resource resource, String type) { if (queue.equals("root")) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacityVector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacityVector.java new file mode 100644 index 0000000..9f6e0e2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacityVector.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.api.records.ResourceInformation; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +/** + * Contains capacity values with calculation types associated for each + * resource. + */ +public class QueueCapacityVector implements + Iterable<QueueCapacityVector.QueueCapacityVectorEntry> { + private static final String START_PARENTHESES = "["; + private static final String END_PARENTHESES = "]"; + private static final String RESOURCE_DELIMITER = ","; + private static final String VALUE_DELIMITER = "="; + + private final ResourceVector resource; + private final Map<String, QueueCapacityType> capacityTypes + = new HashMap<>(); + private final Map<QueueCapacityType, Set<String>> capacityTypePerResource + = new HashMap<>(); + + public QueueCapacityVector() { + this.resource = new ResourceVector(); + } + + private QueueCapacityVector(ResourceVector resource) { + this.resource = resource; + } + + /** + * Creates a zero {@code QueueCapacityVector}. The resources are defined + * in absolute capacity type by default. + * + * @return zero capacity vector + */ + public static QueueCapacityVector newInstance() { + QueueCapacityVector newCapacityVector = + new QueueCapacityVector(ResourceVector.newInstance()); + for (Map.Entry<String, Float> resourceEntry : newCapacityVector.resource) { + newCapacityVector.storeResourceType(resourceEntry.getKey(), + QueueCapacityType.ABSOLUTE); + } + + return newCapacityVector; + } + + /** + * Creates a uniform and homogeneous {@code QueueCapacityVector}. + * The resources are defined in absolute capacity type by default. + * + * @param value value to be set for each resource + * @param capacityType capacity type to be set for each resource + * @return uniform capacity vector + */ + public static QueueCapacityVector of( + float value, QueueCapacityType capacityType) { + QueueCapacityVector newCapacityVector = + new QueueCapacityVector(ResourceVector.of(value)); + for (Map.Entry<String, Float> resourceEntry : newCapacityVector.resource) { + newCapacityVector.storeResourceType(resourceEntry.getKey(), capacityType); + } + + return newCapacityVector; + } + + public QueueCapacityVectorEntry getResource(String resourceName) { + return new QueueCapacityVectorEntry(capacityTypes.get(resourceName), + resourceName, resource.getValue(resourceName)); + } + + /** + * Returns the number of resources defined for this vector. + * + * @return number of resources + */ + public int getResourceCount() { + return capacityTypes.size(); + } + + /** + * Set the value and capacity type of a resource. + * + * @param resourceName name of the resource + * @param value value of the resource + * @param capacityType type of the resource + */ + public void setResource(String resourceName, float value, + QueueCapacityType capacityType) { + // Necessary due to backward compatibility (memory = memory-mb) + String convertedResourceName = resourceName; + if (resourceName.equals("memory")) { + convertedResourceName = ResourceInformation.MEMORY_URI; + } + resource.setValue(convertedResourceName, value); + storeResourceType(convertedResourceName, capacityType); + } + + /** + * A shorthand to retrieve the value stored for the memory resource. + * + * @return value of memory resource + */ + public float getMemory() { + return resource.getValue(ResourceInformation.MEMORY_URI); + } + + /** + * Returns the name of all resources that are defined in the given capacity + * type. + * + * @param capacityType the capacity type of the resources + * @return all resource names for the given capacity type + */ + public Set<String> getResourceNamesByCapacityType( + QueueCapacityType capacityType) { + return capacityTypePerResource.getOrDefault(capacityType, + Collections.emptySet()); + } + + public boolean isResourceOfType( + String resourceName, QueueCapacityType capacityType) { + return capacityTypes.containsKey(resourceName) && + capacityTypes.get(resourceName).equals(capacityType); + } + + @Override + public Iterator<QueueCapacityVectorEntry> iterator() { + return new Iterator<QueueCapacityVectorEntry>() { + private final Iterator<Map.Entry<String, Float>> resources = + resource.iterator(); + private int i = 0; + + @Override + public boolean hasNext() { + return resources.hasNext() && capacityTypes.size() > i; + } + + @Override + public QueueCapacityVectorEntry next() { + Map.Entry<String, Float> resourceInformation = resources.next(); + i++; + return new QueueCapacityVectorEntry( + capacityTypes.get(resourceInformation.getKey()), + resourceInformation.getKey(), resourceInformation.getValue()); + } + }; + } + + /** + * Returns a set of all capacity type defined for this vector. + * + * @return capacity types + */ + public Set<QueueCapacityType> getDefinedCapacityTypes() { + return capacityTypePerResource.keySet(); + } + + private void storeResourceType( + String resourceName, QueueCapacityType resourceType) { + if (capacityTypes.get(resourceName) != null + && !capacityTypes.get(resourceName).equals(resourceType)) { + capacityTypePerResource.get(capacityTypes.get(resourceName)) + .remove(resourceName); + } + + capacityTypePerResource.putIfAbsent(resourceType, new HashSet<>()); + capacityTypePerResource.get(resourceType).add(resourceName); + capacityTypes.put(resourceName, resourceType); + } + + @Override + public String toString() { + StringBuilder stringVector = new StringBuilder(); + stringVector.append(START_PARENTHESES); + + int resourceCount = 0; + for (Map.Entry<String, Float> resourceEntry : resource) { + resourceCount++; + stringVector.append(resourceEntry.getKey()) + .append(VALUE_DELIMITER) + .append(resourceEntry.getValue()) + .append(capacityTypes.get(resourceEntry.getKey()).postfix); + if (resourceCount < capacityTypes.size()) { + stringVector.append(RESOURCE_DELIMITER); + } + } + + stringVector.append(END_PARENTHESES); + + return stringVector.toString(); + } + + /** + * Represents a capacity type associated with its syntax postfix. + */ + public enum QueueCapacityType { + PERCENTAGE("%"), ABSOLUTE(""), WEIGHT("w"); + private final String postfix; + + QueueCapacityType(String postfix) { + this.postfix = postfix; + } + + public String getPostfix() { + return postfix; + } + } + + public static class QueueCapacityVectorEntry { + private final QueueCapacityType vectorResourceType; + private final float resourceValue; + private final String resourceName; + + public QueueCapacityVectorEntry(QueueCapacityType vectorResourceType, + String resourceName, float resourceValue) { + this.vectorResourceType = vectorResourceType; + this.resourceValue = resourceValue; + this.resourceName = resourceName; + } + + public QueueCapacityType getVectorResourceType() { + return vectorResourceType; + } + + public float getResourceValue() { + return resourceValue; + } + + public String getResourceName() { + return resourceName; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ResourceVector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ResourceVector.java new file mode 100644 index 0000000..88c09af --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ResourceVector.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * Represents a simple resource floating point value storage + * grouped by resource names. + */ +public class ResourceVector implements Iterable<Map.Entry<String, Float>> { + private final Map<String, Float> resourcesByName = new HashMap<>(); + + /** + * Creates a new {@code ResourceVector} with all pre-defined resources set to + * zero. + * @return zero resource vector + */ + public static ResourceVector newInstance() { + ResourceVector zeroResourceVector = new ResourceVector(); + for (ResourceInformation resource : ResourceUtils.getResourceTypesArray()) { + zeroResourceVector.setValue(resource.getName(), 0); + } + + return zeroResourceVector; + } + + /** + * Creates a new {@code ResourceVector} with all pre-defined resources set to + * the same value. + * @param value the value to set all resources to + * @return uniform resource vector + */ + public static ResourceVector of(float value) { + ResourceVector emptyResourceVector = new ResourceVector(); + for (ResourceInformation resource : ResourceUtils.getResourceTypesArray()) { + emptyResourceVector.setValue(resource.getName(), value); + } + + return emptyResourceVector; + } + + /** + * Creates a new {@code ResourceVector} with the values set in a + * {@code Resource} object. + * @param resource resource object the resource vector will be based on + * @return uniform resource vector + */ + public static ResourceVector of(Resource resource) { + ResourceVector resourceVector = new ResourceVector(); + for (ResourceInformation resourceInformation : resource.getResources()) { + resourceVector.setValue(resourceInformation.getName(), + resourceInformation.getValue()); + } + + return resourceVector; + } + + /** + * Subtract values for each resource defined in the given resource vector. + * @param otherResourceVector rhs resource vector of the subtraction + */ + public void subtract(ResourceVector otherResourceVector) { + for (Map.Entry<String, Float> resource : otherResourceVector) { + setValue(resource.getKey(), getValue(resource.getKey()) - resource.getValue()); + } + } + + /** + * Increments the given resource by the specified value. + * @param resourceName name of the resource + * @param value value to be added to the resource's current value + */ + public void increment(String resourceName, float value) { + setValue(resourceName, getValue(resourceName) + value); + } + + public Float getValue(String resourceName) { + return resourcesByName.get(resourceName); + } + + public void setValue(String resourceName, float value) { + resourcesByName.put(resourceName, value); + } + + @Override + public Iterator<Map.Entry<String, Float>> iterator() { + return resourcesByName.entrySet().iterator(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + return this.resourcesByName.equals(((ResourceVector) o).resourcesByName); + } + + @Override + public int hashCode() { + return resourcesByName.hashCode(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/QueueCapacityConfigParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/QueueCapacityConfigParser.java new file mode 100644 index 0000000..28eb33c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/QueueCapacityConfigParser.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityType; +import org.apache.hadoop.yarn.util.UnitsConversionUtil; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * A class that parses {@code QueueCapacityVector} from the capacity + * configuration property set for a queue. + * + * A new syntax for capacity property could be implemented, by creating a parser + * with a regex to match the pattern and a method that creates a + * {@code QueueCapacityVector} from the matched pattern. + * Extending the parsers field with a {@code Parser} object in the constructor + * is needed in this case. + * + * A new capacity type for the existing parsers could be added by extending + * the {@code QueueCapacityVector.QueueCapacityType} with a new type and its + * associated postfix symbol. + */ +public class QueueCapacityConfigParser { + private static final String UNIFORM_REGEX = "^([0-9.]+)(.*)"; + private static final String RESOURCE_REGEX = "^\\[([\\w\\.,\\-_%\\ /]+=[\\w\\.,\\-_%\\ /]+)+\\]$"; + + private static final Pattern RESOURCE_PATTERN = Pattern.compile(RESOURCE_REGEX); + private static final Pattern UNIFORM_PATTERN = Pattern.compile(UNIFORM_REGEX); + public static final String FLOAT_DIGIT_REGEX = "[0-9.]"; + + private final List<Parser> parsers = new ArrayList<>(); + + public QueueCapacityConfigParser() { + parsers.add(new Parser(RESOURCE_PATTERN, this::heterogeneousParser)); + parsers.add(new Parser(UNIFORM_PATTERN, this::uniformParser)); + } + + /** + * Creates a {@code QueueCapacityVector} parsed from the capacity configuration + * property set for a queue. + * @param conf configuration object + * @param queuePath queue for which the capacity property is parsed + * @param label node label + * @return a parsed capacity vector + */ + public QueueCapacityVector parse(CapacitySchedulerConfiguration conf, + String queuePath, String label) { + + if (queuePath.equals(CapacitySchedulerConfiguration.ROOT)) { + return QueueCapacityVector.of(100f, QueueCapacityType.PERCENTAGE); + } + + String propertyName = CapacitySchedulerConfiguration.getNodeLabelPrefix( + queuePath, label) + CapacitySchedulerConfiguration.CAPACITY; + String capacityString = conf.get(propertyName); + + if (capacityString == null) { + return new QueueCapacityVector(); + } + // Trim all spaces from capacity string + capacityString = capacityString.replaceAll(" ", ""); + + for (Parser parser : parsers) { + Matcher matcher = parser.regex.matcher(capacityString); + if (matcher.find()) { + return parser.parser.apply(matcher); + } + } + + return new QueueCapacityVector(); + } + + /** + * A parser method that is usable on uniform capacity values e.g. percentage or + * weight. + * @param matcher a regex matcher that contains parsed value and its possible + * suffix + * @return a parsed capacity vector + */ + private QueueCapacityVector uniformParser(Matcher matcher) { + QueueCapacityType capacityType = null; + String value = matcher.group(1); + if (matcher.groupCount() == 2) { + String matchedSuffix = matcher.group(2); + for (QueueCapacityType suffix : QueueCapacityType.values()) { + // Absolute uniform syntax is not supported + if (suffix.equals(QueueCapacityType.ABSOLUTE)) { + continue; + } + // when capacity is given in percentage, we do not need % symbol + String uniformSuffix = suffix.getPostfix().replaceAll("%", ""); + if (uniformSuffix.equals(matchedSuffix)) { + capacityType = suffix; + } + } + } + + if (capacityType == null) { + return new QueueCapacityVector(); + } + + return QueueCapacityVector.of(Float.parseFloat(value), capacityType); + } + + /** + * A parser method that is usable on resource capacity values e.g. mixed or + * absolute resource. + * @param matcher a regex matcher that contains the matched resource string + * @return a parsed capacity vector + */ + private QueueCapacityVector heterogeneousParser(Matcher matcher) { + QueueCapacityVector capacityVector = QueueCapacityVector.newInstance(); + + /* + * Absolute resource configuration for a queue will be grouped by "[]". + * Syntax of absolute resource config could be like below + * "memory=4Gi vcores=2". Ideally this means "4GB of memory and 2 vcores". + */ + // Get the sub-group. + String bracketedGroup = matcher.group(0); + // Get the string inside starting and closing [] + bracketedGroup = bracketedGroup.substring(1, bracketedGroup.length() - 1); + // Split by comma and equals delimiter eg. the string memory=1024,vcores=6 + // is converted to an array of array as {{memory,1024}, {vcores, 6}} + for (String kvPair : bracketedGroup.trim().split(",")) { + String[] splits = kvPair.split("="); + + // Ensure that each sub string is key value pair separated by '='. + if (splits.length > 1) { + setCapacityVector(capacityVector, splits[0], splits[1]); + } + } + + // Memory always have to be defined + if (capacityVector.getMemory() == 0L) { + return new QueueCapacityVector(); + } + + return capacityVector; + } + + private void setCapacityVector( + QueueCapacityVector resource, String resourceName, String resourceValue) { + QueueCapacityType capacityType = QueueCapacityType.ABSOLUTE; + + // Extract suffix from a value e.g. for 6w extract w + String suffix = resourceValue.replaceAll(FLOAT_DIGIT_REGEX, ""); + if (!resourceValue.endsWith(suffix)) { + return; + } + + float parsedResourceValue = Float.parseFloat(resourceValue.substring( + 0, resourceValue.length() - suffix.length())); + float convertedValue = parsedResourceValue; + + if (!suffix.isEmpty() && UnitsConversionUtil.KNOWN_UNITS.contains(suffix)) { + // Convert all incoming units to MB if units is configured. + convertedValue = UnitsConversionUtil.convert(suffix, "Mi", (long) parsedResourceValue); + } else { + for (QueueCapacityType capacityTypeSuffix : QueueCapacityType.values()) { + if (capacityTypeSuffix.getPostfix().equals(suffix)) { + capacityType = capacityTypeSuffix; + } + } + } + + resource.setResource(resourceName, convertedValue, capacityType); + } + + /** + * Checks whether the given capacity string is in a capacity vector compatible + * format. + * @param configuredCapacity capacity string + * @return true, if capacity string is in capacity vector format, + * false otherwise + */ + public boolean isCapacityVectorFormat(String configuredCapacity) { + return configuredCapacity != null + && RESOURCE_PATTERN.matcher(configuredCapacity).find(); + } + + private static class Parser { + private final Pattern regex; + private final Function<Matcher, QueueCapacityVector> parser; + + Parser(Pattern regex, Function<Matcher, QueueCapacityVector> parser) { + this.regex = regex; + this.parser = parser; + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacityVector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacityVector.java new file mode 100644 index 0000000..058e14b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacityVector.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.util.Lists; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityVectorEntry; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON; + +public class TestQueueCapacityVector { + private static final String CUSTOM_RESOURCE = "custom"; + public static final String MIXED_CAPACITY_VECTOR_STRING = + "[custom=3.0,memory-mb=10.0w,vcores=6.0%]"; + + private final YarnConfiguration conf = new YarnConfiguration(); + + @Before + public void setUp() { + conf.set(YarnConfiguration.RESOURCE_TYPES, CUSTOM_RESOURCE); + ResourceUtils.resetResourceTypes(conf); + } + + @Test + public void getResourceNamesByCapacityType() { + QueueCapacityVector capacityVector = QueueCapacityVector.newInstance(); + + capacityVector.setResource(MEMORY_URI, 10, QueueCapacityType.PERCENTAGE); + capacityVector.setResource(VCORES_URI, 6, QueueCapacityType.PERCENTAGE); + + // custom is not set, defaults to 0 + Assert.assertEquals(1, capacityVector.getResourceNamesByCapacityType( + QueueCapacityType.ABSOLUTE).size()); + Assert.assertTrue(capacityVector.getResourceNamesByCapacityType( + QueueCapacityType.ABSOLUTE).contains(CUSTOM_RESOURCE)); + + Assert.assertEquals(2, capacityVector.getResourceNamesByCapacityType( + QueueCapacityType.PERCENTAGE).size()); + Assert.assertTrue(capacityVector.getResourceNamesByCapacityType( + QueueCapacityType.PERCENTAGE).contains(VCORES_URI)); + Assert.assertTrue(capacityVector.getResourceNamesByCapacityType( + QueueCapacityType.PERCENTAGE).contains(MEMORY_URI)); + Assert.assertEquals(10, capacityVector.getResource(MEMORY_URI).getResourceValue(), EPSILON); + Assert.assertEquals(6, capacityVector.getResource(VCORES_URI).getResourceValue(), EPSILON); + } + + @Test + public void isResourceOfType() { + QueueCapacityVector capacityVector = QueueCapacityVector.newInstance(); + + capacityVector.setResource(MEMORY_URI, 10, QueueCapacityType.WEIGHT); + capacityVector.setResource(VCORES_URI, 6, QueueCapacityType.PERCENTAGE); + capacityVector.setResource(CUSTOM_RESOURCE, 3, QueueCapacityType.ABSOLUTE); + + Assert.assertTrue(capacityVector.isResourceOfType(MEMORY_URI, QueueCapacityType.WEIGHT)); + Assert.assertTrue(capacityVector.isResourceOfType(VCORES_URI, QueueCapacityType.PERCENTAGE)); + Assert.assertTrue(capacityVector.isResourceOfType(CUSTOM_RESOURCE, QueueCapacityType.ABSOLUTE)); + } + + @Test + public void testIterator() { + QueueCapacityVector capacityVector = QueueCapacityVector.newInstance(); + List<QueueCapacityVectorEntry> entries = Lists.newArrayList(capacityVector); + + Assert.assertEquals(3, entries.size()); + + QueueCapacityVector emptyCapacityVector = new QueueCapacityVector(); + List<QueueCapacityVectorEntry> emptyEntries = Lists.newArrayList(emptyCapacityVector); + + Assert.assertEquals(0, emptyEntries.size()); + } + + @Test + public void testToString() { + QueueCapacityVector capacityVector = QueueCapacityVector.newInstance(); + + capacityVector.setResource(MEMORY_URI, 10, QueueCapacityType.WEIGHT); + capacityVector.setResource(VCORES_URI, 6, QueueCapacityType.PERCENTAGE); + capacityVector.setResource(CUSTOM_RESOURCE, 3, QueueCapacityType.ABSOLUTE); + + Assert.assertEquals(MIXED_CAPACITY_VECTOR_STRING, capacityVector.toString()); + + QueueCapacityVector emptyCapacityVector = new QueueCapacityVector(); + Assert.assertEquals("[]", emptyCapacityVector.toString()); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestResourceVector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestResourceVector.java new file mode 100644 index 0000000..fd6edb1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestResourceVector.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON; + +public class TestResourceVector { + private final static String CUSTOM_RESOURCE = "custom"; + + private final YarnConfiguration conf = new YarnConfiguration(); + + @Before + public void setUp() { + conf.set(YarnConfiguration.RESOURCE_TYPES, CUSTOM_RESOURCE); + ResourceUtils.resetResourceTypes(conf); + } + + @Test + public void testCreation() { + ResourceVector zeroResourceVector = ResourceVector.newInstance(); + Assert.assertEquals(0, zeroResourceVector.getValue(MEMORY_URI), EPSILON); + Assert.assertEquals(0, zeroResourceVector.getValue(VCORES_URI), EPSILON); + Assert.assertEquals(0, zeroResourceVector.getValue(CUSTOM_RESOURCE), EPSILON); + + ResourceVector uniformResourceVector = ResourceVector.of(10); + Assert.assertEquals(10, uniformResourceVector.getValue(MEMORY_URI), EPSILON); + Assert.assertEquals(10, uniformResourceVector.getValue(VCORES_URI), EPSILON); + Assert.assertEquals(10, uniformResourceVector.getValue(CUSTOM_RESOURCE), EPSILON); + + Map<String, Long> customResources = new HashMap<>(); + customResources.put(CUSTOM_RESOURCE, 2L); + Resource resource = Resource.newInstance(10, 5, customResources); + ResourceVector resourceVectorFromResource = ResourceVector.of(resource); + Assert.assertEquals(10, resourceVectorFromResource.getValue(MEMORY_URI), EPSILON); + Assert.assertEquals(5, resourceVectorFromResource.getValue(VCORES_URI), EPSILON); + Assert.assertEquals(2, resourceVectorFromResource.getValue(CUSTOM_RESOURCE), EPSILON); + } + + @Test + public void testSubtract() { + ResourceVector lhsResourceVector = ResourceVector.of(13); + ResourceVector rhsResourceVector = ResourceVector.of(5); + lhsResourceVector.subtract(rhsResourceVector); + + Assert.assertEquals(8, lhsResourceVector.getValue(MEMORY_URI), EPSILON); + Assert.assertEquals(8, lhsResourceVector.getValue(VCORES_URI), EPSILON); + Assert.assertEquals(8, lhsResourceVector.getValue(CUSTOM_RESOURCE), EPSILON); + + ResourceVector negativeResourceVector = ResourceVector.of(-100); + + // Check whether overflow causes any issues + negativeResourceVector.subtract(ResourceVector.of(Float.MAX_VALUE)); + Assert.assertEquals(-Float.MAX_VALUE, negativeResourceVector.getValue(MEMORY_URI), EPSILON); + Assert.assertEquals(-Float.MAX_VALUE, negativeResourceVector.getValue(VCORES_URI), EPSILON); + Assert.assertEquals(-Float.MAX_VALUE, negativeResourceVector.getValue(CUSTOM_RESOURCE), + EPSILON); + + } + + @Test + public void testIncrement() { + ResourceVector resourceVector = ResourceVector.of(13); + resourceVector.increment(MEMORY_URI, 5); + + Assert.assertEquals(18, resourceVector.getValue(MEMORY_URI), EPSILON); + Assert.assertEquals(13, resourceVector.getValue(VCORES_URI), EPSILON); + Assert.assertEquals(13, resourceVector.getValue(CUSTOM_RESOURCE), EPSILON); + + // Check whether overflow causes any issues + ResourceVector maxFloatResourceVector = ResourceVector.of(Float.MAX_VALUE); + maxFloatResourceVector.increment(MEMORY_URI, 100); + Assert.assertEquals(Float.MAX_VALUE, maxFloatResourceVector.getValue(MEMORY_URI), EPSILON); + } + + @Test + public void testEquals() { + ResourceVector resourceVector = ResourceVector.of(13); + ResourceVector resourceVectorOther = ResourceVector.of(14); + Resource resource = Resource.newInstance(13, 13); + + Assert.assertNotEquals(null, resourceVector); + Assert.assertNotEquals(resourceVectorOther, resourceVector); + Assert.assertNotEquals(resource, resourceVector); + + ResourceVector resourceVectorOne = ResourceVector.of(1); + resourceVectorOther.subtract(resourceVectorOne); + + Assert.assertEquals(resourceVectorOther, resourceVector); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestQueueCapacityConfigParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestQueueCapacityConfigParser.java new file mode 100644 index 0000000..1aba816 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestQueueCapacityConfigParser.java @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.hadoop.util.Lists; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityVectorEntry; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityType; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI; +import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager.NO_LABEL; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetricsForCustomResources.GB; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON; + +public class TestQueueCapacityConfigParser { + + private static final String ALL_RESOURCE_TEMPLATE = "[memory-mb=%s, vcores=%s, yarn.io/gpu=%s]"; + private static final String MEMORY_VCORE_TEMPLATE = "[memory-mb=%s, vcores=%s]"; + + private static final String MEMORY_ABSOLUTE = "12Gi"; + private static final float VCORE_ABSOLUTE = 6; + private static final float GPU_ABSOLUTE = 10; + + private static final float PERCENTAGE_VALUE = 50f; + private static final float MEMORY_MIXED = 1024; + private static final float WEIGHT_VALUE = 6; + + private static final String QUEUE = "root.test"; + + private static final String ABSOLUTE_RESOURCE = String.format( + ALL_RESOURCE_TEMPLATE, MEMORY_ABSOLUTE, VCORE_ABSOLUTE, GPU_ABSOLUTE); + private static final String ABSOLUTE_RESOURCE_MEMORY_VCORE = String.format( + MEMORY_VCORE_TEMPLATE, MEMORY_ABSOLUTE, VCORE_ABSOLUTE); + private static final String MIXED_RESOURCE = String.format( + ALL_RESOURCE_TEMPLATE, MEMORY_MIXED, PERCENTAGE_VALUE + "%", WEIGHT_VALUE + "w"); + private static final String RESOURCE_TYPES = GPU_URI; + + public static final String NONEXISTINGSUFFIX = "50nonexistingsuffix"; + public static final String EMPTY_BRACKET = "[]"; + public static final String INVALID_CAPACITY_BRACKET = "[invalid]"; + public static final String INVALID_CAPACITY_FORMAT = "[memory-100,vcores-60]"; + + private final QueueCapacityConfigParser capacityConfigParser + = new QueueCapacityConfigParser(); + + @Test + public void testPercentageCapacityConfig() { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + conf.setCapacity(QUEUE, PERCENTAGE_VALUE); + + QueueCapacityVector percentageCapacityVector = capacityConfigParser.parse(conf, QUEUE, + NO_LABEL); + QueueCapacityVectorEntry memory = percentageCapacityVector.getResource(MEMORY_URI); + QueueCapacityVectorEntry vcore = percentageCapacityVector.getResource(VCORES_URI); + + Assert.assertEquals(QueueCapacityType.PERCENTAGE, memory.getVectorResourceType()); + Assert.assertEquals(PERCENTAGE_VALUE, memory.getResourceValue(), EPSILON); + + Assert.assertEquals(QueueCapacityType.PERCENTAGE, vcore.getVectorResourceType()); + Assert.assertEquals(PERCENTAGE_VALUE, vcore.getResourceValue(), EPSILON); + + QueueCapacityVector rootCapacityVector = capacityConfigParser.parse(conf, + CapacitySchedulerConfiguration.ROOT, NO_LABEL); + + QueueCapacityVectorEntry memoryRoot = rootCapacityVector.getResource(MEMORY_URI); + QueueCapacityVectorEntry vcoreRoot = rootCapacityVector.getResource(VCORES_URI); + + Assert.assertEquals(QueueCapacityType.PERCENTAGE, memoryRoot.getVectorResourceType()); + Assert.assertEquals(100f, memoryRoot.getResourceValue(), EPSILON); + + Assert.assertEquals(QueueCapacityType.PERCENTAGE, vcoreRoot.getVectorResourceType()); + Assert.assertEquals(100f, vcoreRoot.getResourceValue(), EPSILON); + } + + @Test + public void testWeightCapacityConfig() { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + conf.setNonLabeledQueueWeight(QUEUE, WEIGHT_VALUE); + + QueueCapacityVector weightCapacityVector = capacityConfigParser.parse(conf, QUEUE, NO_LABEL); + + QueueCapacityVectorEntry memory = weightCapacityVector.getResource(MEMORY_URI); + QueueCapacityVectorEntry vcore = weightCapacityVector.getResource(VCORES_URI); + + Assert.assertEquals(QueueCapacityType.WEIGHT, memory.getVectorResourceType()); + Assert.assertEquals(WEIGHT_VALUE, memory.getResourceValue(), EPSILON); + + Assert.assertEquals(QueueCapacityType.WEIGHT, vcore.getVectorResourceType()); + Assert.assertEquals(WEIGHT_VALUE, vcore.getResourceValue(), EPSILON); + } + + @Test + public void testAbsoluteCapacityVectorConfig() { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE) + + CapacitySchedulerConfiguration.CAPACITY, ABSOLUTE_RESOURCE); + conf.set(YarnConfiguration.RESOURCE_TYPES, RESOURCE_TYPES); + ResourceUtils.resetResourceTypes(conf); + + QueueCapacityVector absoluteCapacityVector = capacityConfigParser.parse(conf, QUEUE, NO_LABEL); + + Assert.assertEquals(QueueCapacityType.ABSOLUTE, absoluteCapacityVector.getResource(MEMORY_URI) + .getVectorResourceType()); + Assert.assertEquals(12 * GB, absoluteCapacityVector.getResource(MEMORY_URI) + .getResourceValue(), EPSILON); + + Assert.assertEquals(QueueCapacityType.ABSOLUTE, absoluteCapacityVector.getResource(VCORES_URI) + .getVectorResourceType()); + Assert.assertEquals(VCORE_ABSOLUTE, absoluteCapacityVector.getResource(VCORES_URI) + .getResourceValue(), EPSILON); + + Assert.assertEquals(QueueCapacityType.ABSOLUTE, absoluteCapacityVector.getResource(GPU_URI) + .getVectorResourceType()); + Assert.assertEquals(GPU_ABSOLUTE, absoluteCapacityVector.getResource(GPU_URI) + .getResourceValue(), EPSILON); + + conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE) + + CapacitySchedulerConfiguration.CAPACITY, ABSOLUTE_RESOURCE_MEMORY_VCORE); + QueueCapacityVector withoutGpuVector = capacityConfigParser.parse(conf, QUEUE, NO_LABEL); + + Assert.assertEquals(3, withoutGpuVector.getResourceCount()); + Assert.assertEquals(0f, withoutGpuVector.getResource(GPU_URI).getResourceValue(), EPSILON); + } + + @Test + public void testMixedCapacityConfig() { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE) + + CapacitySchedulerConfiguration.CAPACITY, MIXED_RESOURCE); + conf.set(YarnConfiguration.RESOURCE_TYPES, RESOURCE_TYPES); + ResourceUtils.resetResourceTypes(conf); + + QueueCapacityVector mixedCapacityVector = + capacityConfigParser.parse(conf, QUEUE, NO_LABEL); + + Assert.assertEquals(QueueCapacityType.ABSOLUTE, + mixedCapacityVector.getResource(MEMORY_URI).getVectorResourceType()); + Assert.assertEquals(MEMORY_MIXED, mixedCapacityVector.getResource(MEMORY_URI) + .getResourceValue(), EPSILON); + + Assert.assertEquals(QueueCapacityType.PERCENTAGE, + mixedCapacityVector.getResource(VCORES_URI).getVectorResourceType()); + Assert.assertEquals(PERCENTAGE_VALUE, + mixedCapacityVector.getResource(VCORES_URI).getResourceValue(), EPSILON); + + Assert.assertEquals(QueueCapacityType.WEIGHT, + mixedCapacityVector.getResource(GPU_URI).getVectorResourceType()); + Assert.assertEquals(WEIGHT_VALUE, + mixedCapacityVector.getResource(GPU_URI).getResourceValue(), EPSILON); + + // Test undefined capacity type default value + conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE) + + CapacitySchedulerConfiguration.CAPACITY, ABSOLUTE_RESOURCE_MEMORY_VCORE); + + QueueCapacityVector mixedCapacityVectorWithGpuUndefined = + capacityConfigParser.parse(conf, QUEUE, NO_LABEL); + Assert.assertEquals(QueueCapacityType.ABSOLUTE, + mixedCapacityVectorWithGpuUndefined.getResource(MEMORY_URI).getVectorResourceType()); + Assert.assertEquals(0, mixedCapacityVectorWithGpuUndefined.getResource(GPU_URI) + .getResourceValue(), EPSILON); + + } + + @Test + public void testInvalidCapacityConfigs() { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + + conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE) + + CapacitySchedulerConfiguration.CAPACITY, NONEXISTINGSUFFIX); + QueueCapacityVector capacityVectorWithInvalidSuffix = + capacityConfigParser.parse(conf, QUEUE, NO_LABEL); + List<QueueCapacityVectorEntry> entriesWithInvalidSuffix = + Lists.newArrayList(capacityVectorWithInvalidSuffix.iterator()); + Assert.assertEquals(0, entriesWithInvalidSuffix.size()); + + conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE) + + CapacitySchedulerConfiguration.CAPACITY, INVALID_CAPACITY_FORMAT); + QueueCapacityVector invalidDelimiterCapacityVector = + capacityConfigParser.parse(conf, QUEUE, NO_LABEL); + List<QueueCapacityVectorEntry> invalidDelimiterEntries = + Lists.newArrayList(invalidDelimiterCapacityVector.iterator()); + Assert.assertEquals(0, invalidDelimiterEntries.size()); + + conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE) + + CapacitySchedulerConfiguration.CAPACITY, INVALID_CAPACITY_BRACKET); + QueueCapacityVector invalidCapacityVector = + capacityConfigParser.parse(conf, QUEUE, NO_LABEL); + List<QueueCapacityVectorEntry> resources = + Lists.newArrayList(invalidCapacityVector.iterator()); + Assert.assertEquals(0, resources.size()); + + conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE) + + CapacitySchedulerConfiguration.CAPACITY, EMPTY_BRACKET); + QueueCapacityVector emptyBracketCapacityVector = + capacityConfigParser.parse(conf, QUEUE, NO_LABEL); + List<QueueCapacityVectorEntry> emptyEntries = + Lists.newArrayList(emptyBracketCapacityVector.iterator()); + Assert.assertEquals(0, emptyEntries.size()); + + conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE) + + CapacitySchedulerConfiguration.CAPACITY, ""); + QueueCapacityVector emptyCapacity = + capacityConfigParser.parse(conf, QUEUE, NO_LABEL); + List<QueueCapacityVectorEntry> emptyResources = + Lists.newArrayList(emptyCapacity.iterator()); + Assert.assertEquals(emptyResources.size(), 0); + + conf.unset(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE) + + CapacitySchedulerConfiguration.CAPACITY); + QueueCapacityVector nonSetCapacity = + capacityConfigParser.parse(conf, QUEUE, NO_LABEL); + List<QueueCapacityVectorEntry> nonSetResources = + Lists.newArrayList(nonSetCapacity.iterator()); + Assert.assertEquals(nonSetResources.size(), 0); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org