YARN-7612. Add Processor Framework for Rich Placement Constraints. (asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f7d01a2c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f7d01a2c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f7d01a2c Branch: refs/heads/YARN-6592 Commit: f7d01a2cf23fbf837144d4b2a93efd6d2fd4ab7f Parents: 80af031 Author: Arun Suresh <asur...@apache.org> Authored: Fri Dec 22 15:51:20 2017 -0800 Committer: Arun Suresh <asur...@apache.org> Committed: Tue Jan 30 07:53:34 2018 -0800 ---------------------------------------------------------------------- .../hadoop/yarn/conf/YarnConfiguration.java | 26 ++ .../src/main/resources/yarn-default.xml | 30 ++ .../ApplicationMasterService.java | 15 + .../rmcontainer/RMContainerImpl.java | 7 +- .../scheduler/capacity/CapacityScheduler.java | 2 + .../constraint/processor/BatchedRequests.java | 105 +++++ .../processor/NodeCandidateSelector.java | 38 ++ .../processor/PlacementDispatcher.java | 145 +++++++ .../processor/PlacementProcessor.java | 343 ++++++++++++++++ .../processor/SamplePlacementAlgorithm.java | 144 +++++++ .../constraint/processor/package-info.java | 29 ++ .../yarn/server/resourcemanager/MockAM.java | 26 ++ .../yarn/server/resourcemanager/MockRM.java | 14 + .../constraint/TestPlacementProcessor.java | 394 +++++++++++++++++++ 14 files changed, 1316 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7d01a2c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index bbbfc52..8fb3c2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -531,6 +531,32 @@ public class YarnConfiguration extends Configuration { /** The class to use as the resource scheduler.*/ public static final String RM_SCHEDULER = RM_PREFIX + "scheduler.class"; + + /** Placement Algorithm. */ + public static final String RM_PLACEMENT_CONSTRAINTS_ALGORITHM_CLASS = + RM_PREFIX + "placement-constraints.algorithm.class"; + + public static final String RM_PLACEMENT_CONSTRAINTS_ENABLED = + RM_PREFIX + "placement-constraints.enabled"; + + public static final boolean DEFAULT_RM_PLACEMENT_CONSTRAINTS_ENABLED = true; + + public static final String RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS = + RM_PREFIX + "placement-constraints.retry-attempts"; + + public static final int DEFAULT_RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS = 3; + + public static final String RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE = + RM_PREFIX + "placement-constraints.algorithm.pool-size"; + + public static final int DEFAULT_RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE = + 1; + + public static final String RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE = + RM_PREFIX + "placement-constraints.scheduler.pool-size"; + + public static final int DEFAULT_RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE = + 1; public static final String DEFAULT_RM_SCHEDULER = "org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7d01a2c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 0bb4fca..6d52ace 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -131,6 +131,36 @@ </property> <property> + <description>Enable Constraint Placement.</description> + <name>yarn.resourcemanager.placement-constraints.enabled</name> + <value>false</value> + </property> + + <property> + <description>Number of times to retry placing of rejected SchedulingRequests</description> + <name>yarn.resourcemanager.placement-constraints.retry-attempts</name> + <value>3</value> + </property> + + <property> + <description>Constraint Placement Algorithm to be used.</description> + <name>yarn.resourcemanager.placement-constraints.algorithm.class</name> + <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.SamplePlacementAlgorithm</value> + </property> + + <property> + <description>Threadpool size for the Algorithm used for placement constraint processing.</description> + <name>yarn.resourcemanager.placement-constraints.algorithm.pool-size</name> + <value>1</value> + </property> + + <property> + <description>Threadpool size for the Scheduler invocation phase of placement constraint processing.</description> + <name>yarn.resourcemanager.placement-constraints.scheduler.pool-size</name> + <value>1</value> + </property> + + <property> <description> Comma separated class names of ApplicationMasterServiceProcessor implementations. The processors will be applied in the order http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7d01a2c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 90c42be..aa1177d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.PlacementProcessor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; @@ -114,11 +115,25 @@ public class ApplicationMasterService extends AbstractService implements YarnConfiguration.RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); + initializeProcessingChain(conf); + } + + private void initializeProcessingChain(Configuration conf) { amsProcessingChain.init(rmContext, null); + boolean enablePlacementConstraints = conf.getBoolean( + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, + YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_ENABLED); + if (enablePlacementConstraints) { + amsProcessingChain.addProcessor(new PlacementProcessor()); + } List<ApplicationMasterServiceProcessor> processors = getProcessorList(conf); if (processors != null) { Collections.reverse(processors); for (ApplicationMasterServiceProcessor p : processors) { + // Ensure only single instance of PlacementProcessor is included + if (enablePlacementConstraints && p instanceof PlacementProcessor) { + continue; + } this.amsProcessingChain.addProcessor(p); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7d01a2c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 184cdfc..c873509 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -190,8 +190,7 @@ public class RMContainerImpl implements RMContainer { private boolean isExternallyAllocated; private SchedulerRequestKey allocatedSchedulerKey; - // TODO, set it when container allocated by scheduler (From SchedulingRequest) - private Set<String> allocationTags = null; + private volatile Set<String> allocationTags = null; public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, @@ -510,6 +509,10 @@ public class RMContainerImpl implements RMContainer { return allocationTags; } + public void setAllocationTags(Set<String> tags) { + this.allocationTags = tags; + } + private static class BaseTransition implements SingleArcTransition<RMContainerImpl, RMContainerEvent> { http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7d01a2c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 676c0fe..e682d0f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2601,6 +2601,8 @@ public class CapacityScheduler extends SchedulerRequestKey.extractFrom(container), appAttempt.getApplicationAttemptId(), container.getNodeId(), appAttempt.getUser(), rmContext, false); + ((RMContainerImpl)rmContainer).setAllocationTags( + new HashSet<>(schedulingRequest.getAllocationTags())); allocated = new ContainerAllocationProposal<>( getSchedulerContainer(rmContainer, true), http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7d01a2c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java new file mode 100644 index 0000000..fe92d2f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmInput; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * A grouping of Scheduling Requests which are sent to the PlacementAlgorithm + * to place as a batch. The placement algorithm tends to give more optimal + * placements if more requests are batched together. + */ +class BatchedRequests implements ConstraintPlacementAlgorithmInput { + + // PlacementAlgorithmOutput attempt - the number of times the requests in this + // batch has been placed but was rejected by the scheduler. + private final int placementAttempt; + + private final ApplicationId applicationId; + private final Collection<SchedulingRequest> requests; + private final Map<String, Set<NodeId>> blacklist = new HashMap<>(); + + BatchedRequests(ApplicationId applicationId, + Collection<SchedulingRequest> requests, int attempt) { + this.applicationId = applicationId; + this.requests = requests; + this.placementAttempt = attempt; + } + + /** + * Get Application Id. + * @return Application Id. + */ + ApplicationId getApplicationId() { + return applicationId; + } + + /** + * Get Collection of SchedulingRequests in this batch. + * @return Collection of Scheduling Requests. + */ + @Override + public Collection<SchedulingRequest> getSchedulingRequests() { + return requests; + } + + /** + * Add a Scheduling request to the batch. + * @param req Scheduling Request. + */ + void addToBatch(SchedulingRequest req) { + requests.add(req); + } + + void addToBlacklist(Set<String> tags, SchedulerNode node) { + if (tags != null && !tags.isEmpty()) { + // We are currently assuming a single allocation tag + // per scheduler request currently. + blacklist.computeIfAbsent(tags.iterator().next(), + k -> new HashSet<>()).add(node.getNodeID()); + } + } + + /** + * Get placement attempt. + * @return PlacementAlgorithmOutput placement Attempt. + */ + int getPlacementAttempt() { + return placementAttempt; + } + + /** + * Get any blacklisted nodes associated with tag. + * @param tag Tag. + * @return Set of blacklisted Nodes. + */ + Set<NodeId> getBlacklist(String tag) { + return blacklist.getOrDefault(tag, Collections.EMPTY_SET); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7d01a2c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/NodeCandidateSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/NodeCandidateSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/NodeCandidateSelector.java new file mode 100644 index 0000000..4299050 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/NodeCandidateSelector.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeFilter; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import java.util.List; + +/** + * A read only implementation of the ClusterNodeTracker which exposes a method + * to simply return a filtered list of nodes. + */ +public interface NodeCandidateSelector { + + /** + * Select a list of nodes given a filter. + * @param filter a NodeFilter. + * @return List of SchedulerNodes. + */ + List<SchedulerNode> selectNodes(NodeFilter filter); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7d01a2c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementDispatcher.java new file mode 100644 index 0000000..6a00ba8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementDispatcher.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutputCollector; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * This class initializes the Constraint Placement Algorithm. It dispatches + * input to the algorithm and collects output from it. + */ +class PlacementDispatcher implements + ConstraintPlacementAlgorithmOutputCollector { + + private static final Logger LOG = + LoggerFactory.getLogger(PlacementDispatcher.class); + private ConstraintPlacementAlgorithm algorithm; + private ExecutorService algorithmThreadPool; + + private Map<ApplicationId, List<PlacedSchedulingRequest>> + placedRequests = new ConcurrentHashMap<>(); + private Map<ApplicationId, List<SchedulingRequest>> + rejectedRequests = new ConcurrentHashMap<>(); + + public void init(RMContext rmContext, + ConstraintPlacementAlgorithm placementAlgorithm, int poolSize) { + LOG.info("Initializing Constraint Placement Planner:"); + this.algorithm = placementAlgorithm; + this.algorithm.init(rmContext); + this.algorithmThreadPool = Executors.newFixedThreadPool(poolSize); + } + + void dispatch(final BatchedRequests batchedRequests) { + final ConstraintPlacementAlgorithmOutputCollector collector = this; + Runnable placingTask = () -> { + LOG.debug("Got [{}] requests to place from application [{}].. " + + "Attempt count [{}]", + batchedRequests.getSchedulingRequests().size(), + batchedRequests.getApplicationId(), + batchedRequests.getPlacementAttempt()); + algorithm.place(batchedRequests, collector); + }; + this.algorithmThreadPool.submit(placingTask); + } + + public List<PlacedSchedulingRequest> pullPlacedRequests( + ApplicationId applicationId) { + List<PlacedSchedulingRequest> placedReqs = + this.placedRequests.get(applicationId); + if (placedReqs != null && !placedReqs.isEmpty()) { + List<PlacedSchedulingRequest> retList = new ArrayList<>(); + synchronized (placedReqs) { + if (placedReqs.size() > 0) { + retList.addAll(placedReqs); + placedReqs.clear(); + } + } + return retList; + } + return Collections.EMPTY_LIST; + } + + public List<SchedulingRequest> pullRejectedRequests( + ApplicationId applicationId) { + List<SchedulingRequest> rejectedReqs = + this.rejectedRequests.get(applicationId); + if (rejectedReqs != null && !rejectedReqs.isEmpty()) { + List<SchedulingRequest> retList = new ArrayList<>(); + synchronized (rejectedReqs) { + if (rejectedReqs.size() > 0) { + retList.addAll(rejectedReqs); + rejectedReqs.clear(); + } + } + return retList; + } + return Collections.EMPTY_LIST; + } + + void clearApplicationState(ApplicationId applicationId) { + placedRequests.remove(applicationId); + rejectedRequests.remove(applicationId); + } + + @Override + public void collect(ConstraintPlacementAlgorithmOutput placement) { + if (!placement.getPlacedRequests().isEmpty()) { + List<PlacedSchedulingRequest> processed = + placedRequests.computeIfAbsent( + placement.getApplicationId(), k -> new ArrayList<>()); + synchronized (processed) { + LOG.debug( + "Planning Algorithm has placed for application [{}]" + + " the following [{}]", placement.getApplicationId(), + placement.getPlacedRequests()); + for (PlacedSchedulingRequest esr : + placement.getPlacedRequests()) { + processed.add(esr); + } + } + } + if (!placement.getRejectedRequests().isEmpty()) { + List<SchedulingRequest> rejected = + rejectedRequests.computeIfAbsent( + placement.getApplicationId(), k -> new ArrayList()); + LOG.warn( + "Planning Algorithm has rejected for application [{}]" + + " the following [{}]", placement.getApplicationId(), + placement.getRejectedRequests()); + synchronized (rejected) { + rejected.addAll(placement.getRejectedRequests()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7d01a2c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java new file mode 100644 index 0000000..d613d4e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java @@ -0,0 +1,343 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor; + +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext; +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor; +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest; +import org.apache.hadoop.yarn.api.records.RejectionReason; +import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingResponse; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +/** + * An ApplicationMasterService Processor that performs Constrained placement of + * Scheduling Requests. It does the following: + * 1. All initialization. + * 2. Intercepts placement constraints from the register call and adds it to + * the placement constraint manager. + * 3. Dispatches Scheduling Requests to the Planner. + */ +public class PlacementProcessor implements ApplicationMasterServiceProcessor { + + /** + * Wrapper over the SchedulingResponse that wires in the placement attempt + * and last attempted Node. + */ + static final class Response extends SchedulingResponse { + + private final int placementAttempt; + private final SchedulerNode attemptedNode; + + private Response(boolean isSuccess, ApplicationId applicationId, + SchedulingRequest schedulingRequest, int placementAttempt, + SchedulerNode attemptedNode) { + super(isSuccess, applicationId, schedulingRequest); + this.placementAttempt = placementAttempt; + this.attemptedNode = attemptedNode; + } + } + + private static final Logger LOG = + LoggerFactory.getLogger(PlacementProcessor.class); + private PlacementConstraintManager constraintManager; + private ApplicationMasterServiceProcessor nextAMSProcessor; + + private AbstractYarnScheduler scheduler; + private ExecutorService schedulingThreadPool; + private int retryAttempts; + private Map<ApplicationId, List<BatchedRequests>> requestsToRetry = + new ConcurrentHashMap<>(); + private Map<ApplicationId, List<SchedulingRequest>> requestsToReject = + new ConcurrentHashMap<>(); + + private PlacementDispatcher placementDispatcher; + + + @Override + public void init(ApplicationMasterServiceContext amsContext, + ApplicationMasterServiceProcessor nextProcessor) { + LOG.info("Initializing Constraint Placement Processor:"); + this.nextAMSProcessor = nextProcessor; + this.constraintManager = + ((RMContextImpl)amsContext).getPlacementConstraintManager(); + + this.scheduler = + (AbstractYarnScheduler)((RMContextImpl)amsContext).getScheduler(); + // Only the first class is considered - even if a comma separated + // list is provided. (This is for simplicity, since getInstances does a + // lot of good things by handling things correctly) + List<ConstraintPlacementAlgorithm> instances = + ((RMContextImpl) amsContext).getYarnConfiguration().getInstances( + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_CLASS, + ConstraintPlacementAlgorithm.class); + ConstraintPlacementAlgorithm algorithm = null; + if (instances != null && !instances.isEmpty()) { + algorithm = instances.get(0); + } else { + algorithm = new SamplePlacementAlgorithm(); + } + LOG.info("Planning Algorithm [{}]", algorithm.getClass().getName()); + + int algoPSize = ((RMContextImpl) amsContext).getYarnConfiguration().getInt( + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE, + YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE); + this.placementDispatcher = new PlacementDispatcher(); + this.placementDispatcher.init( + ((RMContextImpl)amsContext), algorithm, algoPSize); + LOG.info("Planning Algorithm pool size [{}]", algoPSize); + + int schedPSize = ((RMContextImpl) amsContext).getYarnConfiguration().getInt( + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE, + YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE); + this.schedulingThreadPool = Executors.newFixedThreadPool(schedPSize); + LOG.info("Scheduler pool size [{}]", schedPSize); + + // Number of times a request that is not satisfied by the scheduler + // can be retried. + this.retryAttempts = + ((RMContextImpl) amsContext).getYarnConfiguration().getInt( + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS); + LOG.info("Num retry attempts [{}]", this.retryAttempts); + } + + @Override + public void registerApplicationMaster(ApplicationAttemptId appAttemptId, + RegisterApplicationMasterRequest request, + RegisterApplicationMasterResponse response) + throws IOException, YarnException { + Map<Set<String>, PlacementConstraint> appPlacementConstraints = + request.getPlacementConstraints(); + processPlacementConstraints( + appAttemptId.getApplicationId(), appPlacementConstraints); + nextAMSProcessor.registerApplicationMaster(appAttemptId, request, response); + } + + private void processPlacementConstraints(ApplicationId applicationId, + Map<Set<String>, PlacementConstraint> appPlacementConstraints) { + if (appPlacementConstraints != null && !appPlacementConstraints.isEmpty()) { + LOG.info("Constraints added for application [{}] against tags [{}]", + applicationId, appPlacementConstraints); + constraintManager.registerApplication( + applicationId, appPlacementConstraints); + } + } + + @Override + public void allocate(ApplicationAttemptId appAttemptId, + AllocateRequest request, AllocateResponse response) throws YarnException { + List<SchedulingRequest> schedulingRequests = + request.getSchedulingRequests(); + dispatchRequestsForPlacement(appAttemptId, schedulingRequests); + reDispatchRetryableRequests(appAttemptId); + schedulePlacedRequests(appAttemptId); + + nextAMSProcessor.allocate(appAttemptId, request, response); + + handleRejectedRequests(appAttemptId, response); + } + + private void dispatchRequestsForPlacement(ApplicationAttemptId appAttemptId, + List<SchedulingRequest> schedulingRequests) { + if (schedulingRequests != null && !schedulingRequests.isEmpty()) { + this.placementDispatcher.dispatch( + new BatchedRequests(appAttemptId.getApplicationId(), + schedulingRequests, 1)); + } + } + + private void reDispatchRetryableRequests(ApplicationAttemptId appAttId) { + List<BatchedRequests> reqsToRetry = + this.requestsToRetry.get(appAttId.getApplicationId()); + if (reqsToRetry != null && !reqsToRetry.isEmpty()) { + synchronized (reqsToRetry) { + for (BatchedRequests bReq: reqsToRetry) { + this.placementDispatcher.dispatch(bReq); + } + reqsToRetry.clear(); + } + } + } + + private void schedulePlacedRequests(ApplicationAttemptId appAttemptId) { + ApplicationId applicationId = appAttemptId.getApplicationId(); + List<PlacedSchedulingRequest> placedSchedulingRequests = + this.placementDispatcher.pullPlacedRequests(applicationId); + for (PlacedSchedulingRequest placedReq : placedSchedulingRequests) { + SchedulingRequest sReq = placedReq.getSchedulingRequest(); + for (SchedulerNode node : placedReq.getNodes()) { + final SchedulingRequest sReqClone = + SchedulingRequest.newInstance(sReq.getAllocationRequestId(), + sReq.getPriority(), sReq.getExecutionType(), + sReq.getAllocationTags(), + ResourceSizing.newInstance( + sReq.getResourceSizing().getResources()), + sReq.getPlacementConstraint()); + SchedulerApplicationAttempt applicationAttempt = + this.scheduler.getApplicationAttempt(appAttemptId); + Runnable task = () -> { + boolean success = + scheduler.attemptAllocationOnNode( + applicationAttempt, sReqClone, node); + if (!success) { + LOG.warn("Unsuccessful allocation attempt [{}] for [{}]", + placedReq.getPlacementAttempt(), sReqClone); + } + handleSchedulingResponse( + new Response(success, applicationId, sReqClone, + placedReq.getPlacementAttempt(), node)); + }; + this.schedulingThreadPool.submit(task); + } + } + } + + private void handleRejectedRequests(ApplicationAttemptId appAttemptId, + AllocateResponse response) { + List<SchedulingRequest> rejectedRequests = + this.placementDispatcher.pullRejectedRequests( + appAttemptId.getApplicationId()); + if (rejectedRequests != null && !rejectedRequests.isEmpty()) { + LOG.warn("Following requests of [{}] were rejected by" + + " the PlacementAlgorithmOutput Algorithm: {}", + appAttemptId.getApplicationId(), rejectedRequests); + ApplicationMasterServiceUtils.addToRejectedSchedulingRequests(response, + rejectedRequests.stream() + .map(sr -> RejectedSchedulingRequest.newInstance( + RejectionReason.COULD_NOT_PLACE_ON_NODE, sr)) + .collect(Collectors.toList())); + } + rejectedRequests = + this.requestsToReject.get(appAttemptId.getApplicationId()); + if (rejectedRequests != null && !rejectedRequests.isEmpty()) { + synchronized (rejectedRequests) { + LOG.warn("Following requests of [{}] exhausted all retry attempts " + + "trying to schedule on placed node: {}", + appAttemptId.getApplicationId(), rejectedRequests); + ApplicationMasterServiceUtils.addToRejectedSchedulingRequests(response, + rejectedRequests.stream() + .map(sr -> RejectedSchedulingRequest.newInstance( + RejectionReason.COULD_NOT_SCHEDULE_ON_NODE, sr)) + .collect(Collectors.toList())); + rejectedRequests.clear(); + } + } + } + + @Override + public void finishApplicationMaster(ApplicationAttemptId appAttemptId, + FinishApplicationMasterRequest request, + FinishApplicationMasterResponse response) { + constraintManager.unregisterApplication(appAttemptId.getApplicationId()); + placementDispatcher.clearApplicationState(appAttemptId.getApplicationId()); + requestsToReject.remove(appAttemptId.getApplicationId()); + requestsToRetry.remove(appAttemptId.getApplicationId()); + nextAMSProcessor.finishApplicationMaster(appAttemptId, request, response); + } + + private void handleSchedulingResponse(SchedulingResponse schedulerResponse) { + int placementAttempt = ((Response)schedulerResponse).placementAttempt; + // Retry this placement as it is not successful and we are still + // under max retry. The req is batched with other unsuccessful + // requests from the same app + if (!schedulerResponse.isSuccess() && placementAttempt < retryAttempts) { + List<BatchedRequests> reqsToRetry = + requestsToRetry.computeIfAbsent( + schedulerResponse.getApplicationId(), + k -> new ArrayList<>()); + synchronized (reqsToRetry) { + addToRetryList(schedulerResponse, placementAttempt, reqsToRetry); + } + LOG.warn("Going to retry request for application [{}] after [{}]" + + " attempts: [{}]", schedulerResponse.getApplicationId(), + placementAttempt, schedulerResponse.getSchedulingRequest()); + } else { + if (!schedulerResponse.isSuccess()) { + LOG.warn("Not retrying request for application [{}] after [{}]" + + " attempts: [{}]", schedulerResponse.getApplicationId(), + placementAttempt, schedulerResponse.getSchedulingRequest()); + List<SchedulingRequest> reqsToReject = + requestsToReject.computeIfAbsent( + schedulerResponse.getApplicationId(), + k -> new ArrayList<>()); + synchronized (reqsToReject) { + reqsToReject.add(schedulerResponse.getSchedulingRequest()); + } + } + } + } + + private void addToRetryList(SchedulingResponse schedulerResponse, + int placementAttempt, List<BatchedRequests> reqsToRetry) { + boolean isAdded = false; + for (BatchedRequests br : reqsToRetry) { + if (br.getPlacementAttempt() == placementAttempt + 1) { + br.addToBatch(schedulerResponse.getSchedulingRequest()); + br.addToBlacklist( + schedulerResponse.getSchedulingRequest().getAllocationTags(), + ((Response) schedulerResponse).attemptedNode); + isAdded = true; + break; + } + } + if (!isAdded) { + BatchedRequests br = + new BatchedRequests(schedulerResponse.getApplicationId(), + Collections.singleton( + schedulerResponse.getSchedulingRequest()), + placementAttempt + 1); + reqsToRetry.add(br); + br.addToBlacklist( + schedulerResponse.getSchedulingRequest().getAllocationTags(), + ((Response) schedulerResponse).attemptedNode); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7d01a2c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SamplePlacementAlgorithm.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SamplePlacementAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SamplePlacementAlgorithm.java new file mode 100644 index 0000000..8d49801 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SamplePlacementAlgorithm.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor; + +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SpecializedConstraintTransformer; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmInput; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutputCollector; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Sample Test algorithm. Assumes anti-affinity always + * It also assumes the numAllocations in resource sizing is always = 1 + * + * NOTE: This is just a sample implementation. Not be actually used + */ +public class SamplePlacementAlgorithm implements ConstraintPlacementAlgorithm { + + private static final Logger LOG = + LoggerFactory.getLogger(SamplePlacementAlgorithm.class); + + private AllocationTagsManager tagsManager; + private PlacementConstraintManager constraintManager; + private NodeCandidateSelector nodeSelector; + + @Override + public void init(RMContext rmContext) { + this.tagsManager = rmContext.getAllocationTagsManager(); + this.constraintManager = rmContext.getPlacementConstraintManager(); + this.nodeSelector = + filter -> ((AbstractYarnScheduler)(rmContext) + .getScheduler()).getNodes(filter); + } + + @Override + public void place(ConstraintPlacementAlgorithmInput input, + ConstraintPlacementAlgorithmOutputCollector collector) { + BatchedRequests requests = (BatchedRequests)input; + ConstraintPlacementAlgorithmOutput resp = + new ConstraintPlacementAlgorithmOutput(requests.getApplicationId()); + List<SchedulerNode> allNodes = nodeSelector.selectNodes(null); + Map<String, List<SchedulingRequest>> tagIndexedRequests = new HashMap<>(); + requests.getSchedulingRequests() + .stream() + .filter(r -> r.getAllocationTags() != null) + .forEach( + req -> req.getAllocationTags().forEach( + tag -> tagIndexedRequests.computeIfAbsent(tag, + k -> new ArrayList<>()).add(req)) + ); + for (Map.Entry<String, List<SchedulingRequest>> entry : + tagIndexedRequests.entrySet()) { + String tag = entry.getKey(); + PlacementConstraint constraint = + constraintManager.getConstraint(requests.getApplicationId(), + Collections.singleton(tag)); + if (constraint != null) { + // Currently works only for simple anti-affinity + // NODE scope target expressions + SpecializedConstraintTransformer transformer = + new SpecializedConstraintTransformer(constraint); + PlacementConstraint transform = transformer.transform(); + TargetConstraint targetConstraint = + (TargetConstraint) transform.getConstraintExpr(); + // Assume a single target expression tag; + // The Sample Algorithm assumes a constraint will always be a simple + // Target Constraint with a single entry in the target set. + // As mentioned in the class javadoc - This algorithm should be + // used mostly for testing and validating end-2-end workflow. + String targetTag = + targetConstraint.getTargetExpressions().iterator().next() + .getTargetValues().iterator().next(); + // iterate over all nodes + Iterator<SchedulerNode> nodeIter = allNodes.iterator(); + List<SchedulingRequest> schedulingRequests = entry.getValue(); + Iterator<SchedulingRequest> reqIter = schedulingRequests.iterator(); + while (reqIter.hasNext()) { + SchedulingRequest sReq = reqIter.next(); + int numAllocs = sReq.getResourceSizing().getNumAllocations(); + while (numAllocs > 0 && nodeIter.hasNext()) { + SchedulerNode node = nodeIter.next(); + long nodeCardinality = 0; + try { + nodeCardinality = tagsManager.getNodeCardinality( + node.getNodeID(), requests.getApplicationId(), + targetTag); + if (nodeCardinality == 0 && + !requests.getBlacklist(tag).contains(node.getNodeID())) { + numAllocs--; + sReq.getResourceSizing().setNumAllocations(numAllocs); + PlacedSchedulingRequest placedReq = + new PlacedSchedulingRequest(sReq); + placedReq.setPlacementAttempt(requests.getPlacementAttempt()); + placedReq.getNodes().add(node); + resp.getPlacedRequests().add(placedReq); + } + } catch (InvalidAllocationTagsQueryException e) { + LOG.warn("Got exception from TagManager !", e); + } + } + } + } + } + // Add all requests whose numAllocations still > 0 to rejected list. + requests.getSchedulingRequests().stream() + .filter(sReq -> sReq.getResourceSizing().getNumAllocations() > 0) + .forEach(rejReq -> resp.getRejectedRequests().add(rejReq)); + collector.collect(resp); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7d01a2c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/package-info.java new file mode 100644 index 0000000..7090154 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/package-info.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package o.a.h.yarn.server.resourcemanager.scheduler.constraint.processor + * contains classes related to scheduling containers using placement + * processor. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7d01a2c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index 12dfe18..975abe6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -21,7 +21,10 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.lang.reflect.UndeclaredThrowableException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -39,7 +42,9 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -57,6 +62,9 @@ public class MockAM { private ApplicationMasterProtocol amRMProtocol; private UserGroupInformation ugi; private volatile AllocateResponse lastResponse; + private Map<Set<String>, PlacementConstraint> placementConstraints = + new HashMap<>(); + private List<SchedulingRequest> schedulingRequests = new ArrayList<>(); private final List<ResourceRequest> requests = new ArrayList<ResourceRequest>(); private final List<ContainerId> releases = new ArrayList<ContainerId>(); @@ -93,6 +101,16 @@ public class MockAM { return registerAppAttempt(true); } + public void addPlacementConstraint(Set<String> tags, + PlacementConstraint constraint) { + placementConstraints.put(tags, constraint); + } + + public MockAM addSchedulingRequest(List<SchedulingRequest> reqs) { + schedulingRequests.addAll(reqs); + return this; + } + public RegisterApplicationMasterResponse registerAppAttempt(boolean wait) throws Exception { if (wait) { @@ -104,6 +122,9 @@ public class MockAM { req.setHost(""); req.setRpcPort(1); req.setTrackingUrl(""); + if (!placementConstraints.isEmpty()) { + req.setPlacementConstraints(this.placementConstraints); + } if (ugi == null) { ugi = UserGroupInformation.createRemoteUser( attemptId.toString()); @@ -247,12 +268,17 @@ public class MockAM { } + public AllocateResponse allocate( List<ResourceRequest> resourceRequest, List<ContainerId> releases) throws Exception { final AllocateRequest req = AllocateRequest.newInstance(0, 0F, resourceRequest, releases, null); + if (!schedulingRequests.isEmpty()) { + req.setSchedulingRequests(schedulingRequests); + schedulingRequests.clear(); + } return allocate(req); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7d01a2c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 2df3788..eb4c626 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataOutputBuffer; @@ -65,6 +66,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.SignalContainerCommand; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; @@ -1240,6 +1242,18 @@ public class MockRM extends ResourceManager { return am; } + public static MockAM launchAndRegisterAM(RMApp app, MockRM rm, MockNM nm, + Map<Set<String>, PlacementConstraint> constraints) throws Exception { + MockAM am = launchAM(app, rm, nm); + for (Map.Entry<Set<String>, PlacementConstraint> e : + constraints.entrySet()) { + am.addPlacementConstraint(e.getKey(), e.getValue()); + } + am.registerAppAttempt(); + rm.waitForState(app.getApplicationId(), RMAppState.RUNNING); + return am; + } + public ApplicationReport getApplicationReport(ApplicationId appId) throws YarnException, IOException { ApplicationClientProtocol client = getClientRMService(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7d01a2c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java new file mode 100644 index 0000000..db8ae15 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java @@ -0,0 +1,394 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest; +import org.apache.hadoop.yarn.api.records.RejectionReason; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.resource.PlacementConstraints; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static java.lang.Thread.sleep; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag; + +/** + * This tests end2end workflow of the constraint placement framework. + */ +public class TestPlacementProcessor { + + private static final int GB = 1024; + + private static final Log LOG = + LogFactory.getLog(TestPlacementProcessor.class); + private MockRM rm; + private DrainDispatcher dispatcher; + + @Before + public void createAndStartRM() { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + YarnConfiguration conf = new YarnConfiguration(csConf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean( + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true); + conf.setInt( + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS, 1); + startRM(conf); + } + + private void startRM(final YarnConfiguration conf) { + dispatcher = new DrainDispatcher(); + rm = new MockRM(conf) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm.start(); + } + + @After + public void stopRM() { + if (rm != null) { + rm.stop(); + } + } + + @Test(timeout = 300000) + public void testPlacement() throws Exception { + HashMap<NodeId, MockNM> nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm3.getNodeId(), nm3); + MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm4.getNodeId(), nm4); + nm1.registerNode(); + nm2.registerNode(); + nm3.registerNode(); + nm4.registerNode(); + + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, + Collections.singletonMap( + Collections.singleton("foo"), + PlacementConstraints.build( + PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))) + )); + am1.addSchedulingRequest( + Arrays.asList( + schedulingRequest(1, 1, 1, 512, "foo"), + schedulingRequest(1, 2, 1, 512, "foo"), + schedulingRequest(1, 3, 1, 512, "foo"), + schedulingRequest(1, 5, 1, 512, "foo")) + ); + AllocateResponse allocResponse = am1.schedule(); // send the request + List<Container> allocatedContainers = new ArrayList<>(); + allocatedContainers.addAll(allocResponse.getAllocatedContainers()); + + // kick the scheduler + + while (allocatedContainers.size() < 4) { + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + nm3.nodeHeartbeat(true); + nm4.nodeHeartbeat(true); + LOG.info("Waiting for containers to be created for app 1..."); + sleep(1000); + allocResponse = am1.schedule(); + allocatedContainers.addAll(allocResponse.getAllocatedContainers()); + } + + Assert.assertEquals(4, allocatedContainers.size()); + Set<NodeId> nodeIds = allocatedContainers.stream() + .map(x -> x.getNodeId()).collect(Collectors.toSet()); + // Ensure unique nodes + Assert.assertEquals(4, nodeIds.size()); + } + + @Test(timeout = 300000) + public void testSchedulerRejection() throws Exception { + HashMap<NodeId, MockNM> nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm3.getNodeId(), nm3); + MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm4.getNodeId(), nm4); + nm1.registerNode(); + nm2.registerNode(); + nm3.registerNode(); + nm4.registerNode(); + + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, + Collections.singletonMap( + Collections.singleton("foo"), + PlacementConstraints.build( + PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))) + )); + am1.addSchedulingRequest( + Arrays.asList( + schedulingRequest(1, 1, 1, 512, "foo"), + schedulingRequest(1, 2, 1, 512, "foo"), + schedulingRequest(1, 3, 1, 512, "foo"), + // Ask for a container larger than the node + schedulingRequest(1, 4, 1, 5120, "foo")) + ); + AllocateResponse allocResponse = am1.schedule(); // send the request + List<Container> allocatedContainers = new ArrayList<>(); + List<RejectedSchedulingRequest> rejectedReqs = new ArrayList<>(); + int allocCount = 1; + allocatedContainers.addAll(allocResponse.getAllocatedContainers()); + rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests()); + + // kick the scheduler + + while (allocCount < 11) { + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + nm3.nodeHeartbeat(true); + nm4.nodeHeartbeat(true); + LOG.info("Waiting for containers to be created for app 1..."); + sleep(1000); + allocResponse = am1.schedule(); + allocatedContainers.addAll(allocResponse.getAllocatedContainers()); + rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests()); + allocCount++; + if (rejectedReqs.size() > 0 && allocatedContainers.size() > 2) { + break; + } + } + + Assert.assertEquals(3, allocatedContainers.size()); + Set<NodeId> nodeIds = allocatedContainers.stream() + .map(x -> x.getNodeId()).collect(Collectors.toSet()); + // Ensure unique nodes + Assert.assertEquals(3, nodeIds.size()); + RejectedSchedulingRequest rej = rejectedReqs.get(0); + Assert.assertEquals(4, rej.getRequest().getAllocationRequestId()); + Assert.assertEquals(RejectionReason.COULD_NOT_SCHEDULE_ON_NODE, + rej.getReason()); + } + + @Test(timeout = 300000) + public void testRePlacementAfterSchedulerRejection() throws Exception { + stopRM(); + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + YarnConfiguration conf = new YarnConfiguration(csConf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean( + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true); + conf.setInt( + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS, 2); + startRM(conf); + + HashMap<NodeId, MockNM> nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm3.getNodeId(), nm3); + MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm4.getNodeId(), nm4); + MockNM nm5 = new MockNM("h5:1234", 8192, rm.getResourceTrackerService()); + nodes.put(nm5.getNodeId(), nm5); + nm1.registerNode(); + nm2.registerNode(); + nm3.registerNode(); + nm4.registerNode(); + // No not register nm5 yet.. + + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, + Collections.singletonMap( + Collections.singleton("foo"), + PlacementConstraints.build( + PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))) + )); + am1.addSchedulingRequest( + Arrays.asList( + schedulingRequest(1, 1, 1, 512, "foo"), + schedulingRequest(1, 2, 1, 512, "foo"), + schedulingRequest(1, 3, 1, 512, "foo"), + // Ask for a container larger than the node + schedulingRequest(1, 4, 1, 5120, "foo")) + ); + AllocateResponse allocResponse = am1.schedule(); // send the request + List<Container> allocatedContainers = new ArrayList<>(); + List<RejectedSchedulingRequest> rejectedReqs = new ArrayList<>(); + int allocCount = 1; + allocatedContainers.addAll(allocResponse.getAllocatedContainers()); + rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests()); + + // Register node5 only after first allocate - so the initial placement + // for the large schedReq goes to some other node.. + nm5.registerNode(); + + // kick the scheduler + while (allocCount < 11) { + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + nm3.nodeHeartbeat(true); + nm4.nodeHeartbeat(true); + nm5.nodeHeartbeat(true); + LOG.info("Waiting for containers to be created for app 1..."); + sleep(1000); + allocResponse = am1.schedule(); + allocatedContainers.addAll(allocResponse.getAllocatedContainers()); + rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests()); + allocCount++; + if (allocatedContainers.size() > 3) { + break; + } + } + + Assert.assertEquals(4, allocatedContainers.size()); + Set<NodeId> nodeIds = allocatedContainers.stream() + .map(x -> x.getNodeId()).collect(Collectors.toSet()); + // Ensure unique nodes + Assert.assertEquals(4, nodeIds.size()); + } + + @Test(timeout = 300000) + public void testPlacementRejection() throws Exception { + HashMap<NodeId, MockNM> nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm3.getNodeId(), nm3); + MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm4.getNodeId(), nm4); + nm1.registerNode(); + nm2.registerNode(); + nm3.registerNode(); + nm4.registerNode(); + + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, + Collections.singletonMap( + Collections.singleton("foo"), + PlacementConstraints.build( + PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))) + )); + am1.addSchedulingRequest( + Arrays.asList( + schedulingRequest(1, 1, 1, 512, "foo"), + schedulingRequest(1, 2, 1, 512, "foo"), + schedulingRequest(1, 3, 1, 512, "foo"), + schedulingRequest(1, 4, 1, 512, "foo"), + // Ask for more containers than nodes + schedulingRequest(1, 5, 1, 512, "foo")) + ); + AllocateResponse allocResponse = am1.schedule(); // send the request + List<Container> allocatedContainers = new ArrayList<>(); + List<RejectedSchedulingRequest> rejectedReqs = new ArrayList<>(); + int allocCount = 1; + allocatedContainers.addAll(allocResponse.getAllocatedContainers()); + rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests()); + + // kick the scheduler + + while (allocCount < 11) { + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + nm3.nodeHeartbeat(true); + nm4.nodeHeartbeat(true); + LOG.info("Waiting for containers to be created for app 1..."); + sleep(1000); + allocResponse = am1.schedule(); + allocatedContainers.addAll(allocResponse.getAllocatedContainers()); + rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests()); + allocCount++; + if (rejectedReqs.size() > 0 && allocatedContainers.size() > 3) { + break; + } + } + + Assert.assertEquals(4, allocatedContainers.size()); + Set<NodeId> nodeIds = allocatedContainers.stream() + .map(x -> x.getNodeId()).collect(Collectors.toSet()); + // Ensure unique nodes + Assert.assertEquals(4, nodeIds.size()); + RejectedSchedulingRequest rej = rejectedReqs.get(0); + Assert.assertEquals(RejectionReason.COULD_NOT_PLACE_ON_NODE, + rej.getReason()); + } + + private static SchedulingRequest schedulingRequest( + int priority, long allocReqId, int cores, int mem, String... tags) { + return schedulingRequest(priority, allocReqId, cores, mem, + ExecutionType.GUARANTEED, tags); + } + + private static SchedulingRequest schedulingRequest( + int priority, long allocReqId, int cores, int mem, + ExecutionType execType, String... tags) { + return SchedulingRequest.newBuilder() + .priority(Priority.newInstance(priority)) + .allocationRequestId(allocReqId) + .allocationTags(new HashSet<>(Arrays.asList(tags))) + .executionType(ExecutionTypeRequest.newInstance(execType, true)) + .resourceSizing( + ResourceSizing.newInstance(1, Resource.newInstance(mem, cores))) + .build(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org