YARN-7522. Introduce AllocationTagsManager to associate allocation tags to nodes. (Wangda Tan via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/801c0988 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/801c0988 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/801c0988 Branch: refs/heads/YARN-6592 Commit: 801c0988b5ad1eff1e896a2635c2937721c96b04 Parents: 69de9a1 Author: Arun Suresh <asur...@apache.org> Authored: Fri Dec 8 00:24:00 2017 -0800 Committer: Arun Suresh <asur...@apache.org> Committed: Wed Jan 31 01:30:17 2018 -0800 ---------------------------------------------------------------------- .../resourcemanager/RMActiveServiceContext.java | 15 + .../yarn/server/resourcemanager/RMContext.java | 5 + .../server/resourcemanager/RMContextImpl.java | 12 + .../server/resourcemanager/ResourceManager.java | 9 + .../constraint/AllocationTagsManager.java | 431 +++++++++++++++++++ .../constraint/AllocationTagsNamespaces.java | 31 ++ .../InvalidAllocationTagsQueryException.java | 35 ++ .../rmcontainer/RMContainer.java | 8 + .../rmcontainer/RMContainerImpl.java | 21 + .../constraint/TestAllocationTagsManager.java | 328 ++++++++++++++ .../rmcontainer/TestRMContainerImpl.java | 124 ++++++ .../scheduler/capacity/TestUtils.java | 9 + .../scheduler/fifo/TestFifoScheduler.java | 5 + 13 files changed, 1033 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/801c0988/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index 9dc5945..6ee3a4c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; +import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; @@ -107,6 +108,7 @@ public class RMActiveServiceContext { private RMAppLifetimeMonitor rmAppLifetimeMonitor; private QueueLimitCalculator queueLimitCalculator; + private AllocationTagsManager allocationTagsManager; public RMActiveServiceContext() { queuePlacementManager = new PlacementManager(); @@ -398,6 +400,19 @@ public class RMActiveServiceContext { @Private @Unstable + public AllocationTagsManager getAllocationTagsManager() { + return allocationTagsManager; + } + + @Private + @Unstable + public void setAllocationTagsManager( + AllocationTagsManager allocationTagsManager) { + this.allocationTagsManager = allocationTagsManager; + } + + @Private + @Unstable public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() { return rmDelegatedNodeLabelsUpdater; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/801c0988/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index ec94030..62899d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWri import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; +import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; @@ -166,4 +167,8 @@ public interface RMContext extends ApplicationMasterServiceContext { void setResourceProfilesManager(ResourceProfilesManager mgr); String getAppProxyUrl(Configuration conf, ApplicationId applicationId); + + AllocationTagsManager getAllocationTagsManager(); + + void setAllocationTagsManager(AllocationTagsManager allocationTagsManager); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/801c0988/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 80a9109..315fdc1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWri import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; +import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; @@ -504,6 +505,17 @@ public class RMContextImpl implements RMContext { } @Override + public AllocationTagsManager getAllocationTagsManager() { + return activeServiceContext.getAllocationTagsManager(); + } + + @Override + public void setAllocationTagsManager( + AllocationTagsManager allocationTagsManager) { + activeServiceContext.setAllocationTagsManager(allocationTagsManager); + } + + @Override public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() { return activeServiceContext.getRMDelegatedNodeLabelsUpdater(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/801c0988/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 32c4b0a..da0feda 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Pu import org.apache.hadoop.yarn.server.resourcemanager.metrics.CombinedSystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; @@ -493,6 +494,10 @@ public class ResourceManager extends CompositeService implements Recoverable { throws InstantiationException, IllegalAccessException { return new RMNodeLabelsManager(); } + + protected AllocationTagsManager createAllocationTagsManager() { + return new AllocationTagsManager(); + } protected DelegationTokenRenewer createDelegationTokenRenewer() { return new DelegationTokenRenewer(); @@ -619,6 +624,10 @@ public class ResourceManager extends CompositeService implements Recoverable { addService(nlm); rmContext.setNodeLabelManager(nlm); + AllocationTagsManager allocationTagsManager = + createAllocationTagsManager(); + rmContext.setAllocationTagsManager(allocationTagsManager); + RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater = createRMDelegatedNodeLabelsUpdater(); if (delegatedNodeLabelsUpdater != null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/801c0988/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsManager.java new file mode 100644 index 0000000..b67fab9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsManager.java @@ -0,0 +1,431 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.resourcemanager.constraint; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.log4j.Logger; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.LongBinaryOperator; + +/** + * Support storing maps between container-tags/applications and + * nodes. This will be required by affinity/anti-affinity implementation and + * cardinality. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class AllocationTagsManager { + + private static final Logger LOG = Logger.getLogger( + AllocationTagsManager.class); + + private ReentrantReadWriteLock.ReadLock readLock; + private ReentrantReadWriteLock.WriteLock writeLock; + + // Application's tags to node + private Map<ApplicationId, NodeToCountedTags> perAppMappings = + new HashMap<>(); + + // Global tags to node mapping (used to fast return aggregated tags + // cardinality across apps) + private NodeToCountedTags globalMapping = new NodeToCountedTags(); + + /** + * Store node to counted tags. + */ + @VisibleForTesting + static class NodeToCountedTags { + // Map<NodeId, Map<Tag, Count>> + private Map<NodeId, Map<String, Long>> nodeToTagsWithCount = + new HashMap<>(); + + // protected by external locks + private void addTagsToNode(NodeId nodeId, Set<String> tags) { + Map<String, Long> innerMap = nodeToTagsWithCount.computeIfAbsent(nodeId, + k -> new HashMap<>()); + + for (String tag : tags) { + Long count = innerMap.get(tag); + if (count == null) { + innerMap.put(tag, 1L); + } else{ + innerMap.put(tag, count + 1); + } + } + } + + // protected by external locks + private void addTagToNode(NodeId nodeId, String tag) { + Map<String, Long> innerMap = nodeToTagsWithCount.computeIfAbsent(nodeId, + k -> new HashMap<>()); + + Long count = innerMap.get(tag); + if (count == null) { + innerMap.put(tag, 1L); + } else{ + innerMap.put(tag, count + 1); + } + } + + private void removeTagFromInnerMap(Map<String, Long> innerMap, String tag) { + Long count = innerMap.get(tag); + if (count > 1) { + innerMap.put(tag, count - 1); + } else { + if (count <= 0) { + LOG.warn( + "Trying to remove tags from node, however the count already" + + " becomes 0 or less, it could be a potential bug."); + } + innerMap.remove(tag); + } + } + + private void removeTagsFromNode(NodeId nodeId, Set<String> tags) { + Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId); + if (innerMap == null) { + LOG.warn("Failed to find node=" + nodeId + + " while trying to remove tags, please double check."); + return; + } + + for (String tag : tags) { + removeTagFromInnerMap(innerMap, tag); + } + + if (innerMap.isEmpty()) { + nodeToTagsWithCount.remove(nodeId); + } + } + + private void removeTagFromNode(NodeId nodeId, String tag) { + Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId); + if (innerMap == null) { + LOG.warn("Failed to find node=" + nodeId + + " while trying to remove tags, please double check."); + return; + } + + removeTagFromInnerMap(innerMap, tag); + + if (innerMap.isEmpty()) { + nodeToTagsWithCount.remove(nodeId); + } + } + + private long getCardinality(NodeId nodeId, String tag) { + Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId); + if (innerMap == null) { + return 0; + } + Long value = innerMap.get(tag); + return value == null ? 0 : value; + } + + private long getCardinality(NodeId nodeId, Set<String> tags, + LongBinaryOperator op) { + Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId); + if (innerMap == null) { + return 0; + } + + long returnValue = 0; + boolean firstTag = true; + + if (tags != null && !tags.isEmpty()) { + for (String tag : tags) { + Long value = innerMap.get(tag); + if (value == null) { + value = 0L; + } + + if (firstTag) { + returnValue = value; + firstTag = false; + continue; + } + + returnValue = op.applyAsLong(returnValue, value); + } + } else { + // Similar to above if, but only iterate values for better performance + for (long value : innerMap.values()) { + // For the first value, we will not apply op + if (firstTag) { + returnValue = value; + firstTag = false; + continue; + } + returnValue = op.applyAsLong(returnValue, value); + } + } + return returnValue; + } + + private boolean isEmpty() { + return nodeToTagsWithCount.isEmpty(); + } + + @VisibleForTesting + public Map<NodeId, Map<String, Long>> getNodeToTagsWithCount() { + return nodeToTagsWithCount; + } + } + + @VisibleForTesting + Map<ApplicationId, NodeToCountedTags> getPerAppMappings() { + return perAppMappings; + } + + @VisibleForTesting + NodeToCountedTags getGlobalMapping() { + return globalMapping; + } + + public AllocationTagsManager() { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + } + + /** + * Notify container allocated on a node. + * + * @param nodeId allocated node. + * @param applicationId applicationId + * @param containerId container id. + * @param allocationTags allocation tags, see + * {@link SchedulingRequest#getAllocationTags()} + * application_id will be added to allocationTags. + */ + public void addContainer(NodeId nodeId, ApplicationId applicationId, + ContainerId containerId, Set<String> allocationTags) { + String applicationIdTag = + AllocationTagsNamespaces.APP_ID + applicationId.toString(); + + boolean useSet = false; + if (allocationTags != null && !allocationTags.isEmpty()) { + // Copy before edit it. + allocationTags = new HashSet<>(allocationTags); + allocationTags.add(applicationIdTag); + useSet = true; + } + + writeLock.lock(); + try { + NodeToCountedTags perAppTagsMapping = perAppMappings.computeIfAbsent( + applicationId, k -> new NodeToCountedTags()); + + if (useSet) { + perAppTagsMapping.addTagsToNode(nodeId, allocationTags); + globalMapping.addTagsToNode(nodeId, allocationTags); + } else { + perAppTagsMapping.addTagToNode(nodeId, applicationIdTag); + globalMapping.addTagToNode(nodeId, applicationIdTag); + } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Added container=" + containerId + " with tags=[" + StringUtils + .join(allocationTags, ",") + "]"); + } + } finally { + writeLock.unlock(); + } + } + + /** + * Notify container removed. + * + * @param nodeId nodeId + * @param applicationId applicationId + * @param containerId containerId. + * @param allocationTags allocation tags for given container + */ + public void removeContainer(NodeId nodeId, ApplicationId applicationId, + ContainerId containerId, Set<String> allocationTags) { + String applicationIdTag = + AllocationTagsNamespaces.APP_ID + applicationId.toString(); + boolean useSet = false; + + if (allocationTags != null && !allocationTags.isEmpty()) { + // Copy before edit it. + allocationTags = new HashSet<>(allocationTags); + allocationTags.add(applicationIdTag); + useSet = true; + } + + writeLock.lock(); + try { + NodeToCountedTags perAppTagsMapping = perAppMappings.get(applicationId); + if (perAppTagsMapping == null) { + return; + } + + if (useSet) { + perAppTagsMapping.removeTagsFromNode(nodeId, allocationTags); + globalMapping.removeTagsFromNode(nodeId, allocationTags); + } else { + perAppTagsMapping.removeTagFromNode(nodeId, applicationIdTag); + globalMapping.removeTagFromNode(nodeId, applicationIdTag); + } + + if (perAppTagsMapping.isEmpty()) { + perAppMappings.remove(applicationId); + } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Removed container=" + containerId + " with tags=[" + StringUtils + .join(allocationTags, ",") + "]"); + } + } finally { + writeLock.unlock(); + } + } + + /** + * Get cardinality for following conditions. External can pass-in a binary op + * to implement customized logic. * + * @param nodeId nodeId, required. + * @param applicationId applicationId. When null is specified, return + * aggregated cardinality among all nodes. + * @param tag allocation tag, see + * {@link SchedulingRequest#getAllocationTags()}, + * When multiple tags specified. Returns cardinality + * depends on op. If a specified tag doesn't exist, + * 0 will be its cardinality. + * When null/empty tags specified, all tags + * (of the node/app) will be considered. + * @return cardinality of specified query on the node. + * @throws InvalidAllocationTagsQueryException when illegal query + * parameter specified + */ + public long getNodeCardinality(NodeId nodeId, ApplicationId applicationId, + String tag) throws InvalidAllocationTagsQueryException { + readLock.lock(); + + try { + if (nodeId == null) { + throw new InvalidAllocationTagsQueryException( + "Must specify nodeId/tags/op to query cardinality"); + } + + NodeToCountedTags mapping; + if (applicationId != null) { + mapping = perAppMappings.get(applicationId); + } else{ + mapping = globalMapping; + } + + if (mapping == null) { + return 0; + } + + return mapping.getCardinality(nodeId, tag); + } finally { + readLock.unlock(); + } + } + + /** + * Check if given tag exists on node. + * + * @param nodeId nodeId, required. + * @param applicationId applicationId. When null is specified, return + * aggregated cardinality among all nodes. + * @param tag allocation tag, see + * {@link SchedulingRequest#getAllocationTags()}, + * When multiple tags specified. Returns cardinality + * depends on op. If a specified tag doesn't exist, + * 0 will be its cardinality. + * When null/empty tags specified, all tags + * (of the node/app) will be considered. + * @return cardinality of specified query on the node. + * @throws InvalidAllocationTagsQueryException when illegal query + * parameter specified + */ + public boolean allocationTagExistsOnNode(NodeId nodeId, + ApplicationId applicationId, String tag) + throws InvalidAllocationTagsQueryException { + return getNodeCardinality(nodeId, applicationId, tag) > 0; + } + + /** + * Get cardinality for following conditions. External can pass-in a binary op + * to implement customized logic. + * + * @param nodeId nodeId, required. + * @param applicationId applicationId. When null is specified, return + * aggregated cardinality among all nodes. + * @param tags allocation tags, see + * {@link SchedulingRequest#getAllocationTags()}, + * When multiple tags specified. Returns cardinality + * depends on op. If a specified tag doesn't exist, 0 + * will be its cardinality. When null/empty tags + * specified, all tags (of the node/app) will be + * considered. + * @param op operator. Such as Long::max, Long::sum, etc. Required. + * This sparameter only take effect when #values >= 2. + * @return cardinality of specified query on the node. + * @throws InvalidAllocationTagsQueryException when illegal query + * parameter specified + */ + public long getNodeCardinalityByOp(NodeId nodeId, ApplicationId applicationId, + Set<String> tags, LongBinaryOperator op) + throws InvalidAllocationTagsQueryException { + readLock.lock(); + + try { + if (nodeId == null || op == null) { + throw new InvalidAllocationTagsQueryException( + "Must specify nodeId/tags/op to query cardinality"); + } + + NodeToCountedTags mapping; + if (applicationId != null) { + mapping = perAppMappings.get(applicationId); + } else{ + mapping = globalMapping; + } + + if (mapping == null) { + return 0; + } + + return mapping.getCardinality(nodeId, tags, op); + } finally { + readLock.unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/801c0988/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsNamespaces.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsNamespaces.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsNamespaces.java new file mode 100644 index 0000000..893ff1c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsNamespaces.java @@ -0,0 +1,31 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.resourcemanager.constraint; + +/** + * Predefined namespaces for tags + * + * Same as namespace of resource types. Namespaces of placement tags are start + * with alphabets and ended with "/" + */ +public class AllocationTagsNamespaces { + public static final String APP_ID = "yarn_app_id/"; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/801c0988/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/InvalidAllocationTagsQueryException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/InvalidAllocationTagsQueryException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/InvalidAllocationTagsQueryException.java new file mode 100644 index 0000000..5519e39 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/InvalidAllocationTagsQueryException.java @@ -0,0 +1,35 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.resourcemanager.constraint; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * Exception when invalid parameter specified to do placement tags related + * queries. + */ +public class InvalidAllocationTagsQueryException extends YarnException { + private static final long serialVersionUID = 12312831974894L; + + public InvalidAllocationTagsQueryException(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/801c0988/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java index f3cbf63..8f751b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; import java.util.List; +import java.util.Set; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; @@ -30,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; @@ -115,4 +117,10 @@ public interface RMContainer extends EventHandler<RMContainerEvent>, boolean completed(); NodeId getNodeId(); + + /** + * Return {@link SchedulingRequest#getAllocationTags()} specified by AM. + * @return allocation tags, could be null/empty + */ + Set<String> getAllocationTags(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/801c0988/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index e26689e..184cdfc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; import java.util.Collections; import java.util.EnumSet; import java.util.List; +import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -189,6 +190,9 @@ public class RMContainerImpl implements RMContainer { private boolean isExternallyAllocated; private SchedulerRequestKey allocatedSchedulerKey; + // TODO, set it when container allocated by scheduler (From SchedulingRequest) + private Set<String> allocationTags = null; + public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, RMContext rmContext) { @@ -501,6 +505,11 @@ public class RMContainerImpl implements RMContainer { return nodeId; } + @Override + public Set<String> getAllocationTags() { + return allocationTags; + } + private static class BaseTransition implements SingleArcTransition<RMContainerImpl, RMContainerEvent> { @@ -565,6 +574,12 @@ public class RMContainerImpl implements RMContainer { @Override public void transition(RMContainerImpl container, RMContainerEvent event) { + // Notify placementManager + container.rmContext.getAllocationTagsManager().addContainer( + container.getNodeId(), + container.getApplicationAttemptId().getApplicationId(), + container.getContainerId(), container.getAllocationTags()); + container.eventHandler.handle(new RMAppAttemptEvent( container.appAttemptId, RMAppAttemptEventType.CONTAINER_ALLOCATED)); } @@ -676,6 +691,12 @@ public class RMContainerImpl implements RMContainer { @Override public void transition(RMContainerImpl container, RMContainerEvent event) { + // Notify placementManager + container.rmContext.getAllocationTagsManager().removeContainer( + container.getNodeId(), + container.getApplicationAttemptId().getApplicationId(), + container.getContainerId(), container.getAllocationTags()); + RMContainerFinishedEvent finishedEvent = (RMContainerFinishedEvent) event; container.finishTime = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/801c0988/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/TestAllocationTagsManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/TestAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/TestAllocationTagsManager.java new file mode 100644 index 0000000..0358792 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/TestAllocationTagsManager.java @@ -0,0 +1,328 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.resourcemanager.constraint; + +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test functionality of AllocationTagsManager. + */ +public class TestAllocationTagsManager { + @Test + public void testAllocationTagsManagerSimpleCases() + throws InvalidAllocationTagsQueryException { + AllocationTagsManager atm = new AllocationTagsManager(); + + /** + * Construct test case: + * Node1: + * container_1_1 (mapper/reducer/app_1) + * container_1_3 (service/app_1) + * + * Node2: + * container_1_2 (mapper/reducer/app_1) + * container_1_4 (reducer/app_1) + * container_2_1 (service/app_2) + */ + + // 3 Containers from app1 + atm.addContainer(NodeId.fromString("node1:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1), + ImmutableSet.of("mapper", "reducer")); + + atm.addContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2), + ImmutableSet.of("mapper", "reducer")); + + atm.addContainer(NodeId.fromString("node1:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3), + ImmutableSet.of("service")); + + atm.addContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4), + ImmutableSet.of("reducer")); + + // 1 Container from app2 + atm.addContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3), + ImmutableSet.of("service")); + + // Get Cardinality of app1 on node1, with tag "mapper" + Assert.assertEquals(1, + atm.getNodeCardinalityByOp(NodeId.fromString("node1:1234"), + TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"), + Long::max)); + + // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=min + Assert.assertEquals(1, + atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), + ImmutableSet.of("mapper", "reducer"), Long::min)); + + // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=max + Assert.assertEquals(2, + atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), + ImmutableSet.of("mapper", "reducer"), Long::max)); + + // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=sum + Assert.assertEquals(3, + atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), + ImmutableSet.of("mapper", "reducer"), Long::sum)); + + // Get Cardinality by passing single tag. + Assert.assertEquals(1, + atm.getNodeCardinality(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), "mapper")); + + Assert.assertEquals(2, + atm.getNodeCardinality(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), "reducer")); + + // Get Cardinality of app1 on node2, with tag "no_existed/reducer", op=min + Assert.assertEquals(0, + atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), + ImmutableSet.of("no_existed", "reducer"), Long::min)); + + // Get Cardinality of app1 on node2, with tag "<applicationId>", op=max + // (Expect this returns #containers from app1 on node2) + Assert.assertEquals(2, + atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), ImmutableSet + .of(AllocationTagsNamespaces.APP_ID + TestUtils + .getMockApplicationId(1).toString()), Long::max)); + + // Get Cardinality of app1 on node2, with empty tag set, op=max + Assert.assertEquals(2, + atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max)); + + // Get Cardinality of all apps on node2, with empty tag set, op=sum + Assert.assertEquals(7, + atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), null, + ImmutableSet.of(), Long::sum)); + + // Get Cardinality of app_1 on node2, with empty tag set, op=sum + Assert.assertEquals(5, + atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::sum)); + + // Get Cardinality of app_1 on node2, with empty tag set, op=sum + Assert.assertEquals(2, + atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(2), ImmutableSet.of(), Long::sum)); + + // Finish all containers: + atm.removeContainer(NodeId.fromString("node1:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1), + ImmutableSet.of("mapper", "reducer")); + + atm.removeContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2), + ImmutableSet.of("mapper", "reducer")); + + atm.removeContainer(NodeId.fromString("node1:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3), + ImmutableSet.of("service")); + + atm.removeContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4), + ImmutableSet.of("reducer")); + + atm.removeContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3), + ImmutableSet.of("service")); + + // Expect all cardinality to be 0 + // Get Cardinality of app1 on node1, with tag "mapper" + Assert.assertEquals(0, + atm.getNodeCardinalityByOp(NodeId.fromString("node1:1234"), + TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"), + Long::max)); + + // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=min + Assert.assertEquals(0, + atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), + ImmutableSet.of("mapper", "reducer"), Long::min)); + + // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=max + Assert.assertEquals(0, + atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), + ImmutableSet.of("mapper", "reducer"), Long::max)); + + // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=sum + Assert.assertEquals(0, + atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), + ImmutableSet.of("mapper", "reducer"), Long::sum)); + + // Get Cardinality of app1 on node2, with tag "<applicationId>", op=max + // (Expect this returns #containers from app1 on node2) + Assert.assertEquals(0, + atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), + ImmutableSet.of(TestUtils.getMockApplicationId(1).toString()), + Long::max)); + + Assert.assertEquals(0, + atm.getNodeCardinality(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), + TestUtils.getMockApplicationId(1).toString())); + + // Get Cardinality of app1 on node2, with empty tag set, op=max + Assert.assertEquals(0, + atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max)); + + // Get Cardinality of all apps on node2, with empty tag set, op=sum + Assert.assertEquals(0, + atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), null, + ImmutableSet.of(), Long::sum)); + + // Get Cardinality of app_1 on node2, with empty tag set, op=sum + Assert.assertEquals(0, + atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::sum)); + + // Get Cardinality of app_1 on node2, with empty tag set, op=sum + Assert.assertEquals(0, + atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(2), ImmutableSet.of(), Long::sum)); + } + + @Test + public void testAllocationTagsManagerMemoryAfterCleanup() + throws InvalidAllocationTagsQueryException { + /** + * Make sure YARN cleans up all memory once container/app finishes. + */ + + AllocationTagsManager atm = new AllocationTagsManager(); + + // Add a bunch of containers + atm.addContainer(NodeId.fromString("node1:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1), + ImmutableSet.of("mapper", "reducer")); + + atm.addContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2), + ImmutableSet.of("mapper", "reducer")); + + atm.addContainer(NodeId.fromString("node1:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3), + ImmutableSet.of("service")); + + atm.addContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4), + ImmutableSet.of("reducer")); + + atm.addContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3), + ImmutableSet.of("service")); + + // Remove all these containers + atm.removeContainer(NodeId.fromString("node1:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1), + ImmutableSet.of("mapper", "reducer")); + + atm.removeContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2), + ImmutableSet.of("mapper", "reducer")); + + atm.removeContainer(NodeId.fromString("node1:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3), + ImmutableSet.of("service")); + + atm.removeContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4), + ImmutableSet.of("reducer")); + + atm.removeContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3), + ImmutableSet.of("service")); + + // Check internal data structure + Assert.assertEquals(0, + atm.getGlobalMapping().getNodeToTagsWithCount().size()); + Assert.assertEquals(0, atm.getPerAppMappings().size()); + } + + @Test + public void testQueryCardinalityWithIllegalParameters() + throws InvalidAllocationTagsQueryException { + /** + * Make sure YARN cleans up all memory once container/app finishes. + */ + + AllocationTagsManager atm = new AllocationTagsManager(); + + // Add a bunch of containers + atm.addContainer(NodeId.fromString("node1:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1), + ImmutableSet.of("mapper", "reducer")); + + atm.addContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2), + ImmutableSet.of("mapper", "reducer")); + + atm.addContainer(NodeId.fromString("node1:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3), + ImmutableSet.of("service")); + + atm.addContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4), + ImmutableSet.of("reducer")); + + atm.addContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3), + ImmutableSet.of("service")); + + // No node-id + boolean caughtException = false; + try { + atm.getNodeCardinalityByOp(null, TestUtils.getMockApplicationId(2), + ImmutableSet.of("mapper"), Long::min); + } catch (InvalidAllocationTagsQueryException e) { + caughtException = true; + } + Assert.assertTrue("should fail because of nodeId specified", + caughtException); + + // No op + caughtException = false; + try { + atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(2), ImmutableSet.of("mapper"), null); + } catch (InvalidAllocationTagsQueryException e) { + caughtException = true; + } + Assert.assertTrue("should fail because of nodeId specified", + caughtException); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/801c0988/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java index 6c189b3..27ff311 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; @@ -109,6 +110,8 @@ public class TestRMContainerImpl { when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); when(rmContext.getRMApps()).thenReturn(rmApps); when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); + AllocationTagsManager ptm = mock(AllocationTagsManager.class); + when(rmContext.getAllocationTagsManager()).thenReturn(ptm); YarnConfiguration conf = new YarnConfiguration(); conf.setBoolean( YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO, @@ -209,6 +212,8 @@ public class TestRMContainerImpl { when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); + AllocationTagsManager ptm = mock(AllocationTagsManager.class); + when(rmContext.getAllocationTagsManager()).thenReturn(ptm); YarnConfiguration conf = new YarnConfiguration(); conf.setBoolean( @@ -367,4 +372,123 @@ public class TestRMContainerImpl { verify(publisher, times(1)).containerCreated(any(RMContainer.class), anyLong()); verify(publisher, times(1)).containerFinished(any(RMContainer.class), anyLong()); } + + @Test + public void testContainerTransitionNotifyPlacementTagsManager() + throws Exception { + DrainDispatcher drainDispatcher = new DrainDispatcher(); + EventHandler<RMAppAttemptEvent> appAttemptEventHandler = mock( + EventHandler.class); + EventHandler generic = mock(EventHandler.class); + drainDispatcher.register(RMAppAttemptEventType.class, + appAttemptEventHandler); + drainDispatcher.register(RMNodeEventType.class, generic); + drainDispatcher.init(new YarnConfiguration()); + drainDispatcher.start(); + NodeId nodeId = BuilderUtils.newNodeId("host", 3425); + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + appId, 1); + ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1); + ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class); + + Resource resource = BuilderUtils.newResource(512, 1); + Priority priority = BuilderUtils.newPriority(5); + + Container container = BuilderUtils.newContainer(containerId, nodeId, + "host:3465", resource, priority, null); + ConcurrentMap<ApplicationId, RMApp> rmApps = + spy(new ConcurrentHashMap<ApplicationId, RMApp>()); + RMApp rmApp = mock(RMApp.class); + when(rmApp.getRMAppAttempt(Matchers.any())).thenReturn(null); + Mockito.doReturn(rmApp).when(rmApps).get(Matchers.any()); + + RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); + SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class); + AllocationTagsManager tagsManager = new AllocationTagsManager(); + RMContext rmContext = mock(RMContext.class); + when(rmContext.getDispatcher()).thenReturn(drainDispatcher); + when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); + when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); + when(rmContext.getRMApps()).thenReturn(rmApps); + when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); + when(rmContext.getAllocationTagsManager()).thenReturn(tagsManager); + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean( + YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO, + true); + when(rmContext.getYarnConfiguration()).thenReturn(conf); + + /* First container: ALLOCATED -> KILLED */ + RMContainer rmContainer = new RMContainerImpl(container, + SchedulerRequestKey.extractFrom(container), appAttemptId, + nodeId, "user", rmContext); + + Assert.assertEquals(0, + tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + + rmContainer.handle(new RMContainerEvent(containerId, + RMContainerEventType.START)); + + Assert.assertEquals(1, + tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + + rmContainer.handle(new RMContainerFinishedEvent(containerId, ContainerStatus + .newInstance(containerId, ContainerState.COMPLETE, "", 0), + RMContainerEventType.KILL)); + + Assert.assertEquals(0, + tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + + /* Second container: ACQUIRED -> FINISHED */ + rmContainer = new RMContainerImpl(container, + SchedulerRequestKey.extractFrom(container), appAttemptId, + nodeId, "user", rmContext); + + Assert.assertEquals(0, + tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + + rmContainer.handle(new RMContainerEvent(containerId, + RMContainerEventType.START)); + + Assert.assertEquals(1, + tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + + rmContainer.handle( + new RMContainerEvent(containerId, RMContainerEventType.ACQUIRED)); + + rmContainer.handle(new RMContainerFinishedEvent(containerId, ContainerStatus + .newInstance(containerId, ContainerState.COMPLETE, "", 0), + RMContainerEventType.FINISHED)); + + Assert.assertEquals(0, + tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + + /* Third container: RUNNING -> FINISHED */ + rmContainer = new RMContainerImpl(container, + SchedulerRequestKey.extractFrom(container), appAttemptId, + nodeId, "user", rmContext); + + Assert.assertEquals(0, + tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + + rmContainer.handle(new RMContainerEvent(containerId, + RMContainerEventType.START)); + + Assert.assertEquals(1, + tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + + rmContainer.handle( + new RMContainerEvent(containerId, RMContainerEventType.ACQUIRED)); + + rmContainer.handle( + new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED)); + + rmContainer.handle(new RMContainerFinishedEvent(containerId, ContainerStatus + .newInstance(containerId, ContainerState.COMPLETE, "", 0), + RMContainerEventType.FINISHED)); + + Assert.assertEquals(0, + tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/801c0988/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index e3326c7..61a5555 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -135,6 +136,9 @@ public class TestUtils { new DefaultResourceCalculator()); rmContext.setScheduler(mockScheduler); + AllocationTagsManager ptm = mock(AllocationTagsManager.class); + rmContext.setAllocationTagsManager(ptm); + return rmContext; } @@ -234,6 +238,11 @@ public class TestUtils { doReturn(id).when(containerId).getContainerId(); return containerId; } + + public static ContainerId getMockContainerId(int appId, int containerId) { + ApplicationAttemptId attemptId = getMockApplicationAttemptId(appId, 1); + return ContainerId.newContainerId(attemptId, containerId); + } public static Container getMockContainer( ContainerId containerId, NodeId nodeId, http://git-wip-us.apache.org/repos/asf/hadoop/blob/801c0988/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index 3f97b59..4b902a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWri import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -234,6 +235,8 @@ public class TestFifoScheduler { FifoScheduler scheduler = new FifoScheduler(); RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null, null, containerTokenSecretManager, nmTokenSecretManager, null, scheduler); + AllocationTagsManager ptm = mock(AllocationTagsManager.class); + rmContext.setAllocationTagsManager(ptm); rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class)); rmContext.setRMApplicationHistoryWriter( mock(RMApplicationHistoryWriter.class)); @@ -312,12 +315,14 @@ public class TestFifoScheduler { FifoScheduler scheduler = new FifoScheduler(); RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null, null, containerTokenSecretManager, nmTokenSecretManager, null, scheduler); + AllocationTagsManager ptm = mock(AllocationTagsManager.class); rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class)); rmContext.setRMApplicationHistoryWriter(mock(RMApplicationHistoryWriter.class)); ((RMContextImpl) rmContext).setYarnConfiguration(new YarnConfiguration()); NullRMNodeLabelsManager nlm = new NullRMNodeLabelsManager(); nlm.init(new Configuration()); rmContext.setNodeLabelManager(nlm); + rmContext.setAllocationTagsManager(ptm); scheduler.setRMContext(rmContext); ((RMContextImpl) rmContext).setScheduler(scheduler); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org