YARN-7920. Simplify configuration for PlacementConstraints. Contributed by Wangda Tan.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0b489e56 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0b489e56 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0b489e56 Branch: refs/heads/trunk Commit: 0b489e564ce5a50324a530e29c18aa8a75276c50 Parents: 4747395 Author: Konstantinos Karanasos <kkarana...@apache.org> Authored: Thu Feb 15 14:23:27 2018 -0800 Committer: Konstantinos Karanasos <kkarana...@apache.org> Committed: Thu Feb 15 14:23:38 2018 -0800 ---------------------------------------------------------------------- .../hadoop/yarn/conf/YarnConfiguration.java | 54 ++- .../TestAMRMClientPlacementConstraints.java | 3 +- .../src/main/resources/yarn-default.xml | 10 +- .../ApplicationMasterService.java | 46 ++- .../scheduler/capacity/CapacityScheduler.java | 13 - .../CapacitySchedulerConfiguration.java | 5 - .../processor/AbstractPlacementProcessor.java | 96 +++++ .../processor/DisabledPlacementProcessor.java | 77 ++++ .../processor/PlacementConstraintProcessor.java | 340 +++++++++++++++++ .../processor/PlacementProcessor.java | 377 ------------------- .../processor/SchedulerPlacementProcessor.java | 55 +++ ...apacitySchedulerSchedulingRequestUpdate.java | 4 + ...estSchedulingRequestContainerAllocation.java | 8 +- ...hedulingRequestContainerAllocationAsync.java | 4 +- .../scheduler/capacity/TestUtils.java | 4 +- .../constraint/TestPlacementProcessor.java | 12 +- .../src/site/markdown/PlacementConstraints.md | 136 +++++++ .../site/markdown/PlacementConstraints.md.vm | 149 -------- 18 files changed, 818 insertions(+), 575 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 118f9fb..6677478 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -532,11 +532,57 @@ public class YarnConfiguration extends Configuration { public static final String RM_SCHEDULER = RM_PREFIX + "scheduler.class"; - /** Enable rich placement constraints. */ - public static final String RM_PLACEMENT_CONSTRAINTS_ENABLED = - RM_PREFIX + "placement-constraints.enabled"; + /** + * Specify which handler will be used to process PlacementConstraints. + * For details on PlacementConstraints, please refer to + * {@link org.apache.hadoop.yarn.api.resource.PlacementConstraint} + */ + @Private + public static final String RM_PLACEMENT_CONSTRAINTS_HANDLER = + RM_PREFIX + "placement-constraints.handler"; + + /** + * This handler rejects all allocate calls made by an application, if they + * contain a {@link org.apache.hadoop.yarn.api.records.SchedulingRequest}. + */ + @Private + public static final String DISABLED_RM_PLACEMENT_CONSTRAINTS_HANDLER = + "disabled"; - public static final boolean DEFAULT_RM_PLACEMENT_CONSTRAINTS_ENABLED = false; + /** + * Using this handler, the placement of containers with constraints is + * determined as a pre-processing step before the capacity or the fair + * scheduler is called. Once the placement is decided, the capacity/fair + * scheduler is invoked to perform the actual allocation. The advantage of + * this approach is that it supports all constraint types (affinity, + * anti-affinity, cardinality). Moreover, it considers multiple containers at + * a time, which allows to satisfy more constraints than a container-at-a-time + * approach can achieve. As it sits outside the main scheduler, it can be used + * by both the capacity and fair schedulers. Note that at the moment it does + * not account for task priorities within an application, given that such + * priorities might be conflicting with the placement constraints. + */ + @Private + public static final String PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER = + "placement-processor"; + + /** + * Using this handler, containers with constraints will be placed by the main + * scheduler. If the configured RM scheduler + * <pre>yarn.resourcemanager.scheduler.class</pre> + * cannot handle placement constraints, the corresponding SchedulingRequests + * will be rejected. As of now, only the capacity scheduler supports + * SchedulingRequests. In particular, it currently supports anti-affinity + * constraints (no affinity or cardinality) and places one container at a + * time. The advantage of this handler compared to the placement-processor is + * that it follows the same ordering rules for queues (sorted by utilization, + * priority) and apps (sorted by FIFO/fairness/priority) as the ones followed + * by the main scheduler. + */ + @Private + public static final String + SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER = + "scheduler"; /** Placement Algorithm. */ public static final String RM_PLACEMENT_CONSTRAINTS_ALGORITHM_CLASS = http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java index fdc8d58..0e88299 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java @@ -65,7 +65,8 @@ public class TestAMRMClientPlacementConstraints extends BaseAMRMClientTest { // mismatches between client and server teardown(); conf = new YarnConfiguration(); - conf.setBoolean(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER); createClusterAndStartApplication(conf); AMRMClient<AMRMClient.ContainerRequest> amClient = http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 509a040..adf8d8a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -131,9 +131,13 @@ </property> <property> - <description>Enable Constraint Placement.</description> - <name>yarn.resourcemanager.placement-constraints.enabled</name> - <value>false</value> + <description> + Specify which handler will be used to process PlacementConstraints. + Acceptable values are: `placement-processor`, `scheduler` and `disabled`. + For a detailed explanation of these values, please refer to documentation. + </description> + <name>yarn.resourcemanager.placement-constraints.handler</name> + <value>disabled</value> </property> <property> http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index aa1177d..ae28879 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -59,7 +59,6 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.PlacementProcessor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; @@ -67,6 +66,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.AbstractPlacementProcessor; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.DisabledPlacementProcessor; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.PlacementConstraintProcessor; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.SchedulerPlacementProcessor; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.security.MasterKeyData; @@ -118,20 +121,47 @@ public class ApplicationMasterService extends AbstractService implements initializeProcessingChain(conf); } + private void addPlacementConstraintHandler(Configuration conf) { + String placementConstraintsHandler = + conf.get(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.DISABLED_RM_PLACEMENT_CONSTRAINTS_HANDLER); + if (placementConstraintsHandler + .equals(YarnConfiguration.DISABLED_RM_PLACEMENT_CONSTRAINTS_HANDLER)) { + LOG.info(YarnConfiguration.DISABLED_RM_PLACEMENT_CONSTRAINTS_HANDLER + + " placement handler will be used, all scheduling requests will " + + "be rejected."); + amsProcessingChain.addProcessor(new DisabledPlacementProcessor()); + } else if (placementConstraintsHandler + .equals(YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER)) { + LOG.info(YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER + + " placement handler will be used. Scheduling requests will be " + + "handled by the placement constraint processor"); + amsProcessingChain.addProcessor(new PlacementConstraintProcessor()); + } else if (placementConstraintsHandler + .equals(YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER)) { + LOG.info(YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER + + " placement handler will be used. Scheduling requests will be " + + "handled by the main scheduler."); + amsProcessingChain.addProcessor(new SchedulerPlacementProcessor()); + } + } + private void initializeProcessingChain(Configuration conf) { amsProcessingChain.init(rmContext, null); - boolean enablePlacementConstraints = conf.getBoolean( - YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, - YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_ENABLED); - if (enablePlacementConstraints) { - amsProcessingChain.addProcessor(new PlacementProcessor()); - } + addPlacementConstraintHandler(conf); + List<ApplicationMasterServiceProcessor> processors = getProcessorList(conf); if (processors != null) { Collections.reverse(processors); for (ApplicationMasterServiceProcessor p : processors) { // Ensure only single instance of PlacementProcessor is included - if (enablePlacementConstraints && p instanceof PlacementProcessor) { + if (p instanceof AbstractPlacementProcessor) { + LOG.warn("Found PlacementProcessor=" + p.getClass().getCanonicalName() + + " defined in " + + YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS + + ", however PlacementProcessor handler should be configured " + + "by using " + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER + + ", this processor will be ignored."); continue; } this.amsProcessingChain.addProcessor(p); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index cd9d1373..ddab0c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -63,7 +63,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceSizing; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; @@ -1098,18 +1097,6 @@ public class CapacityScheduler extends return EMPTY_ALLOCATION; } - if ((!getConfiguration().getBoolean( - CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED, - CapacitySchedulerConfiguration.DEFAULT_SCHEDULING_REQUEST_ALLOWED)) - && schedulingRequests != null && (!schedulingRequests.isEmpty())) { - throw new SchedulerInvalidResoureRequestException( - "Application attempt:" + applicationAttemptId - + " is using SchedulingRequest, which is disabled. Please update " - + CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED - + " to true in capacity-scheduler.xml in order to use this " - + "feature."); - } - // The allocate may be the leftover from previous attempt, and it will // impact current attempt, such as confuse the request and allocation for // current attempt's AM container. http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 00733a1..e609be9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -77,11 +77,6 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur @Private public static final String PREFIX = "yarn.scheduler.capacity."; - - @Private - public static final String SCHEDULING_REQUEST_ALLOWED = - PREFIX + "scheduling-request.allowed"; - public static final boolean DEFAULT_SCHEDULING_REQUEST_ALLOWED = false; @Private public static final String DOT = "."; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/AbstractPlacementProcessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/AbstractPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/AbstractPlacementProcessor.java new file mode 100644 index 0000000..96ae623 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/AbstractPlacementProcessor.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor; + +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext; +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +/** + * Base class for all PlacementProcessors. + */ +public abstract class AbstractPlacementProcessor implements + ApplicationMasterServiceProcessor{ + private static final Logger LOG = + LoggerFactory.getLogger(AbstractPlacementProcessor.class); + + protected ApplicationMasterServiceProcessor nextAMSProcessor; + protected AbstractYarnScheduler scheduler; + private PlacementConstraintManager constraintManager; + + @Override + public void init(ApplicationMasterServiceContext amsContext, + ApplicationMasterServiceProcessor nextProcessor) { + this.nextAMSProcessor = nextProcessor; + this.scheduler = + (AbstractYarnScheduler) ((RMContextImpl) amsContext).getScheduler(); + this.constraintManager = + ((RMContextImpl)amsContext).getPlacementConstraintManager(); + } + + @Override + public void registerApplicationMaster( + ApplicationAttemptId applicationAttemptId, + RegisterApplicationMasterRequest request, + RegisterApplicationMasterResponse response) + throws IOException, YarnException { + Map<Set<String>, PlacementConstraint> appPlacementConstraints = + request.getPlacementConstraints(); + processPlacementConstraints(applicationAttemptId.getApplicationId(), + appPlacementConstraints); + nextAMSProcessor.registerApplicationMaster(applicationAttemptId, request, + response); + } + + private void processPlacementConstraints(ApplicationId applicationId, + Map<Set<String>, PlacementConstraint> appPlacementConstraints) { + if (appPlacementConstraints != null && !appPlacementConstraints.isEmpty()) { + LOG.info("Constraints added for application [{}] against tags [{}]", + applicationId, appPlacementConstraints); + constraintManager.registerApplication( + applicationId, appPlacementConstraints); + } + } + + @Override + public void finishApplicationMaster(ApplicationAttemptId applicationAttemptId, + FinishApplicationMasterRequest request, + FinishApplicationMasterResponse response) { + constraintManager.unregisterApplication( + applicationAttemptId.getApplicationId()); + this.nextAMSProcessor.finishApplicationMaster(applicationAttemptId, request, + response); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/DisabledPlacementProcessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/DisabledPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/DisabledPlacementProcessor.java new file mode 100644 index 0000000..0d093a7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/DisabledPlacementProcessor.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Processor that reject all SchedulingRequests. + */ +public class DisabledPlacementProcessor extends AbstractPlacementProcessor { + private static final Logger LOG = + LoggerFactory.getLogger(DisabledPlacementProcessor.class); + + @Override + public void registerApplicationMaster( + ApplicationAttemptId applicationAttemptId, + RegisterApplicationMasterRequest request, + RegisterApplicationMasterResponse response) + throws IOException, YarnException { + if (request.getPlacementConstraints() != null && !request + .getPlacementConstraints().isEmpty()) { + String message = "Found non empty placement constraints map in " + + "RegisterApplicationMasterRequest for application=" + + applicationAttemptId.toString() + ", but the configured " + + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER + + " cannot handle placement constraints. Rejecting this " + + "registerApplicationMaster operation"; + LOG.warn(message); + throw new YarnException(message); + } + nextAMSProcessor.registerApplicationMaster(applicationAttemptId, request, + response); + } + + @Override + public void allocate(ApplicationAttemptId appAttemptId, + AllocateRequest request, AllocateResponse response) throws YarnException { + if (request.getSchedulingRequests() != null && !request + .getSchedulingRequests().isEmpty()) { + String message = "Found non empty SchedulingRequest in " + + "AllocateRequest for application=" + + appAttemptId.toString() + ", but the configured " + + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER + + " cannot handle placement constraints. Rejecting this " + + "allocate operation"; + LOG.warn(message); + throw new YarnException(message); + } + nextAMSProcessor.allocate(appAttemptId, request, response); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java new file mode 100644 index 0000000..f089a19 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java @@ -0,0 +1,340 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor; + +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext; +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor; +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest; +import org.apache.hadoop.yarn.api.records.RejectionReason; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingRequestWithPlacementAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingResponse; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +/** + * An ApplicationMasterServiceProcessor that performs Constrained placement of + * Scheduling Requests. It does the following: + * 1. All initialization. + * 2. Intercepts placement constraints from the register call and adds it to + * the placement constraint manager. + * 3. Dispatches Scheduling Requests to the Planner. + */ +public class PlacementConstraintProcessor extends AbstractPlacementProcessor { + + /** + * Wrapper over the SchedulingResponse that wires in the placement attempt + * and last attempted Node. + */ + static final class Response extends SchedulingResponse { + + private final int placementAttempt; + private final SchedulerNode attemptedNode; + + private Response(boolean isSuccess, ApplicationId applicationId, + SchedulingRequest schedulingRequest, int placementAttempt, + SchedulerNode attemptedNode) { + super(isSuccess, applicationId, schedulingRequest); + this.placementAttempt = placementAttempt; + this.attemptedNode = attemptedNode; + } + } + + private static final Logger LOG = + LoggerFactory.getLogger(PlacementConstraintProcessor.class); + + private ExecutorService schedulingThreadPool; + private int retryAttempts; + private Map<ApplicationId, List<BatchedRequests>> requestsToRetry = + new ConcurrentHashMap<>(); + private Map<ApplicationId, List<SchedulingRequest>> requestsToReject = + new ConcurrentHashMap<>(); + + private BatchedRequests.IteratorType iteratorType; + private PlacementDispatcher placementDispatcher; + + + @Override + public void init(ApplicationMasterServiceContext amsContext, + ApplicationMasterServiceProcessor nextProcessor) { + LOG.info("Initializing Constraint Placement Processor:"); + super.init(amsContext, nextProcessor); + + // Only the first class is considered - even if a comma separated + // list is provided. (This is for simplicity, since getInstances does a + // lot of good things by handling things correctly) + List<ConstraintPlacementAlgorithm> instances = + ((RMContextImpl) amsContext).getYarnConfiguration().getInstances( + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_CLASS, + ConstraintPlacementAlgorithm.class); + ConstraintPlacementAlgorithm algorithm = null; + if (instances != null && !instances.isEmpty()) { + algorithm = instances.get(0); + } else { + algorithm = new DefaultPlacementAlgorithm(); + } + LOG.info("Placement Algorithm [{}]", algorithm.getClass().getName()); + + String iteratorName = ((RMContextImpl) amsContext).getYarnConfiguration() + .get(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_ITERATOR, + BatchedRequests.IteratorType.SERIAL.name()); + LOG.info("Placement Algorithm Iterator[{}]", iteratorName); + try { + iteratorType = BatchedRequests.IteratorType.valueOf(iteratorName); + } catch (IllegalArgumentException e) { + throw new YarnRuntimeException( + "Could not instantiate Placement Algorithm Iterator: ", e); + } + + int algoPSize = ((RMContextImpl) amsContext).getYarnConfiguration().getInt( + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE, + YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE); + this.placementDispatcher = new PlacementDispatcher(); + this.placementDispatcher.init( + ((RMContextImpl)amsContext), algorithm, algoPSize); + LOG.info("Planning Algorithm pool size [{}]", algoPSize); + + int schedPSize = ((RMContextImpl) amsContext).getYarnConfiguration().getInt( + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE, + YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE); + this.schedulingThreadPool = Executors.newFixedThreadPool(schedPSize); + LOG.info("Scheduler pool size [{}]", schedPSize); + + // Number of times a request that is not satisfied by the scheduler + // can be retried. + this.retryAttempts = + ((RMContextImpl) amsContext).getYarnConfiguration().getInt( + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS); + LOG.info("Num retry attempts [{}]", this.retryAttempts); + } + + @Override + public void allocate(ApplicationAttemptId appAttemptId, + AllocateRequest request, AllocateResponse response) throws YarnException { + // Copy the scheduling request since we will clear it later after sending + // to dispatcher + List<SchedulingRequest> schedulingRequests = + new ArrayList<>(request.getSchedulingRequests()); + dispatchRequestsForPlacement(appAttemptId, schedulingRequests); + reDispatchRetryableRequests(appAttemptId); + schedulePlacedRequests(appAttemptId); + + // Remove SchedulingRequest from AllocateRequest to avoid SchedulingRequest + // added to scheduler. + request.setSchedulingRequests(Collections.emptyList()); + + nextAMSProcessor.allocate(appAttemptId, request, response); + + handleRejectedRequests(appAttemptId, response); + } + + private void dispatchRequestsForPlacement(ApplicationAttemptId appAttemptId, + List<SchedulingRequest> schedulingRequests) { + if (schedulingRequests != null && !schedulingRequests.isEmpty()) { + // Normalize the Requests before dispatching + schedulingRequests.forEach(req -> { + Resource reqResource = req.getResourceSizing().getResources(); + req.getResourceSizing() + .setResources(this.scheduler.getNormalizedResource(reqResource)); + }); + this.placementDispatcher.dispatch(new BatchedRequests(iteratorType, + appAttemptId.getApplicationId(), schedulingRequests, 1)); + } + } + + private void reDispatchRetryableRequests(ApplicationAttemptId appAttId) { + List<BatchedRequests> reqsToRetry = + this.requestsToRetry.get(appAttId.getApplicationId()); + if (reqsToRetry != null && !reqsToRetry.isEmpty()) { + synchronized (reqsToRetry) { + for (BatchedRequests bReq: reqsToRetry) { + this.placementDispatcher.dispatch(bReq); + } + reqsToRetry.clear(); + } + } + } + + private void schedulePlacedRequests(ApplicationAttemptId appAttemptId) { + ApplicationId applicationId = appAttemptId.getApplicationId(); + List<PlacedSchedulingRequest> placedSchedulingRequests = + this.placementDispatcher.pullPlacedRequests(applicationId); + for (PlacedSchedulingRequest placedReq : placedSchedulingRequests) { + SchedulingRequest sReq = placedReq.getSchedulingRequest(); + for (SchedulerNode node : placedReq.getNodes()) { + final SchedulingRequest sReqClone = + SchedulingRequest.newInstance(sReq.getAllocationRequestId(), + sReq.getPriority(), sReq.getExecutionType(), + sReq.getAllocationTags(), + ResourceSizing.newInstance( + sReq.getResourceSizing().getResources()), + sReq.getPlacementConstraint()); + SchedulerApplicationAttempt applicationAttempt = + this.scheduler.getApplicationAttempt(appAttemptId); + Runnable task = () -> { + boolean success = + scheduler.attemptAllocationOnNode( + applicationAttempt, sReqClone, node); + if (!success) { + LOG.warn("Unsuccessful allocation attempt [{}] for [{}]", + placedReq.getPlacementAttempt(), sReqClone); + } + handleSchedulingResponse( + new Response(success, applicationId, sReqClone, + placedReq.getPlacementAttempt(), node)); + }; + this.schedulingThreadPool.submit(task); + } + } + } + + private void handleRejectedRequests(ApplicationAttemptId appAttemptId, + AllocateResponse response) { + List<SchedulingRequestWithPlacementAttempt> rejectedAlgoRequests = + this.placementDispatcher.pullRejectedRequests( + appAttemptId.getApplicationId()); + if (rejectedAlgoRequests != null && !rejectedAlgoRequests.isEmpty()) { + LOG.warn("Following requests of [{}] were rejected by" + + " the PlacementAlgorithmOutput Algorithm: {}", + appAttemptId.getApplicationId(), rejectedAlgoRequests); + rejectedAlgoRequests.stream() + .filter(req -> req.getPlacementAttempt() < retryAttempts) + .forEach(req -> handleSchedulingResponse( + new Response(false, appAttemptId.getApplicationId(), + req.getSchedulingRequest(), req.getPlacementAttempt(), + null))); + ApplicationMasterServiceUtils.addToRejectedSchedulingRequests(response, + rejectedAlgoRequests.stream() + .filter(req -> req.getPlacementAttempt() >= retryAttempts) + .map(sr -> RejectedSchedulingRequest.newInstance( + RejectionReason.COULD_NOT_PLACE_ON_NODE, + sr.getSchedulingRequest())) + .collect(Collectors.toList())); + } + List<SchedulingRequest> rejectedRequests = + this.requestsToReject.get(appAttemptId.getApplicationId()); + if (rejectedRequests != null && !rejectedRequests.isEmpty()) { + synchronized (rejectedRequests) { + LOG.warn("Following requests of [{}] exhausted all retry attempts " + + "trying to schedule on placed node: {}", + appAttemptId.getApplicationId(), rejectedRequests); + ApplicationMasterServiceUtils.addToRejectedSchedulingRequests(response, + rejectedRequests.stream() + .map(sr -> RejectedSchedulingRequest.newInstance( + RejectionReason.COULD_NOT_SCHEDULE_ON_NODE, sr)) + .collect(Collectors.toList())); + rejectedRequests.clear(); + } + } + } + + @Override + public void finishApplicationMaster(ApplicationAttemptId appAttemptId, + FinishApplicationMasterRequest request, + FinishApplicationMasterResponse response) { + placementDispatcher.clearApplicationState(appAttemptId.getApplicationId()); + requestsToReject.remove(appAttemptId.getApplicationId()); + requestsToRetry.remove(appAttemptId.getApplicationId()); + super.finishApplicationMaster(appAttemptId, request, response); + } + + private void handleSchedulingResponse(SchedulingResponse schedulerResponse) { + int placementAttempt = ((Response)schedulerResponse).placementAttempt; + // Retry this placement as it is not successful and we are still + // under max retry. The req is batched with other unsuccessful + // requests from the same app + if (!schedulerResponse.isSuccess() && placementAttempt < retryAttempts) { + List<BatchedRequests> reqsToRetry = + requestsToRetry.computeIfAbsent( + schedulerResponse.getApplicationId(), + k -> new ArrayList<>()); + synchronized (reqsToRetry) { + addToRetryList(schedulerResponse, placementAttempt, reqsToRetry); + } + LOG.warn("Going to retry request for application [{}] after [{}]" + + " attempts: [{}]", schedulerResponse.getApplicationId(), + placementAttempt, schedulerResponse.getSchedulingRequest()); + } else { + if (!schedulerResponse.isSuccess()) { + LOG.warn("Not retrying request for application [{}] after [{}]" + + " attempts: [{}]", schedulerResponse.getApplicationId(), + placementAttempt, schedulerResponse.getSchedulingRequest()); + List<SchedulingRequest> reqsToReject = + requestsToReject.computeIfAbsent( + schedulerResponse.getApplicationId(), + k -> new ArrayList<>()); + synchronized (reqsToReject) { + reqsToReject.add(schedulerResponse.getSchedulingRequest()); + } + } + } + } + + private void addToRetryList(SchedulingResponse schedulerResponse, + int placementAttempt, List<BatchedRequests> reqsToRetry) { + boolean isAdded = false; + for (BatchedRequests br : reqsToRetry) { + if (br.getPlacementAttempt() == placementAttempt + 1) { + br.addToBatch(schedulerResponse.getSchedulingRequest()); + br.addToBlacklist( + schedulerResponse.getSchedulingRequest().getAllocationTags(), + ((Response) schedulerResponse).attemptedNode); + isAdded = true; + break; + } + } + if (!isAdded) { + BatchedRequests br = new BatchedRequests(iteratorType, + schedulerResponse.getApplicationId(), + Collections.singleton(schedulerResponse.getSchedulingRequest()), + placementAttempt + 1); + reqsToRetry.add(br); + br.addToBlacklist( + schedulerResponse.getSchedulingRequest().getAllocationTags(), + ((Response) schedulerResponse).attemptedNode); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java deleted file mode 100644 index 9ce38f4..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java +++ /dev/null @@ -1,377 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor; - -import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext; -import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor; -import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest; -import org.apache.hadoop.yarn.api.records.RejectionReason; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceSizing; -import org.apache.hadoop.yarn.api.records.SchedulingRequest; -import org.apache.hadoop.yarn.api.resource.PlacementConstraint; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingRequestWithPlacementAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingResponse; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.stream.Collectors; - -/** - * An ApplicationMasterService Processor that performs Constrained placement of - * Scheduling Requests. It does the following: - * 1. All initialization. - * 2. Intercepts placement constraints from the register call and adds it to - * the placement constraint manager. - * 3. Dispatches Scheduling Requests to the Planner. - */ -public class PlacementProcessor implements ApplicationMasterServiceProcessor { - - /** - * Wrapper over the SchedulingResponse that wires in the placement attempt - * and last attempted Node. - */ - static final class Response extends SchedulingResponse { - - private final int placementAttempt; - private final SchedulerNode attemptedNode; - - private Response(boolean isSuccess, ApplicationId applicationId, - SchedulingRequest schedulingRequest, int placementAttempt, - SchedulerNode attemptedNode) { - super(isSuccess, applicationId, schedulingRequest); - this.placementAttempt = placementAttempt; - this.attemptedNode = attemptedNode; - } - } - - private static final Logger LOG = - LoggerFactory.getLogger(PlacementProcessor.class); - private PlacementConstraintManager constraintManager; - private ApplicationMasterServiceProcessor nextAMSProcessor; - - private AbstractYarnScheduler scheduler; - private ExecutorService schedulingThreadPool; - private int retryAttempts; - private Map<ApplicationId, List<BatchedRequests>> requestsToRetry = - new ConcurrentHashMap<>(); - private Map<ApplicationId, List<SchedulingRequest>> requestsToReject = - new ConcurrentHashMap<>(); - - private BatchedRequests.IteratorType iteratorType; - private PlacementDispatcher placementDispatcher; - - - @Override - public void init(ApplicationMasterServiceContext amsContext, - ApplicationMasterServiceProcessor nextProcessor) { - LOG.info("Initializing Constraint Placement Processor:"); - this.nextAMSProcessor = nextProcessor; - this.constraintManager = - ((RMContextImpl)amsContext).getPlacementConstraintManager(); - - this.scheduler = - (AbstractYarnScheduler)((RMContextImpl)amsContext).getScheduler(); - // Only the first class is considered - even if a comma separated - // list is provided. (This is for simplicity, since getInstances does a - // lot of good things by handling things correctly) - List<ConstraintPlacementAlgorithm> instances = - ((RMContextImpl) amsContext).getYarnConfiguration().getInstances( - YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_CLASS, - ConstraintPlacementAlgorithm.class); - ConstraintPlacementAlgorithm algorithm = null; - if (instances != null && !instances.isEmpty()) { - algorithm = instances.get(0); - } else { - algorithm = new DefaultPlacementAlgorithm(); - } - LOG.info("Placement Algorithm [{}]", algorithm.getClass().getName()); - - String iteratorName = ((RMContextImpl) amsContext).getYarnConfiguration() - .get(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_ITERATOR, - BatchedRequests.IteratorType.SERIAL.name()); - LOG.info("Placement Algorithm Iterator[{}]", iteratorName); - try { - iteratorType = BatchedRequests.IteratorType.valueOf(iteratorName); - } catch (IllegalArgumentException e) { - throw new YarnRuntimeException( - "Could not instantiate Placement Algorithm Iterator: ", e); - } - - int algoPSize = ((RMContextImpl) amsContext).getYarnConfiguration().getInt( - YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE, - YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE); - this.placementDispatcher = new PlacementDispatcher(); - this.placementDispatcher.init( - ((RMContextImpl)amsContext), algorithm, algoPSize); - LOG.info("Planning Algorithm pool size [{}]", algoPSize); - - int schedPSize = ((RMContextImpl) amsContext).getYarnConfiguration().getInt( - YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE, - YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE); - this.schedulingThreadPool = Executors.newFixedThreadPool(schedPSize); - LOG.info("Scheduler pool size [{}]", schedPSize); - - // Number of times a request that is not satisfied by the scheduler - // can be retried. - this.retryAttempts = - ((RMContextImpl) amsContext).getYarnConfiguration().getInt( - YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS, - YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS); - LOG.info("Num retry attempts [{}]", this.retryAttempts); - } - - @Override - public void registerApplicationMaster(ApplicationAttemptId appAttemptId, - RegisterApplicationMasterRequest request, - RegisterApplicationMasterResponse response) - throws IOException, YarnException { - Map<Set<String>, PlacementConstraint> appPlacementConstraints = - request.getPlacementConstraints(); - processPlacementConstraints( - appAttemptId.getApplicationId(), appPlacementConstraints); - nextAMSProcessor.registerApplicationMaster(appAttemptId, request, response); - } - - private void processPlacementConstraints(ApplicationId applicationId, - Map<Set<String>, PlacementConstraint> appPlacementConstraints) { - if (appPlacementConstraints != null && !appPlacementConstraints.isEmpty()) { - LOG.info("Constraints added for application [{}] against tags [{}]", - applicationId, appPlacementConstraints); - constraintManager.registerApplication( - applicationId, appPlacementConstraints); - } - } - - @Override - public void allocate(ApplicationAttemptId appAttemptId, - AllocateRequest request, AllocateResponse response) throws YarnException { - // Copy the scheduling request since we will clear it later after sending - // to dispatcher - List<SchedulingRequest> schedulingRequests = - new ArrayList<>(request.getSchedulingRequests()); - dispatchRequestsForPlacement(appAttemptId, schedulingRequests); - reDispatchRetryableRequests(appAttemptId); - schedulePlacedRequests(appAttemptId); - - // Remove SchedulingRequest from AllocateRequest to avoid SchedulingRequest - // added to scheduler. - request.setSchedulingRequests(Collections.emptyList()); - - nextAMSProcessor.allocate(appAttemptId, request, response); - - handleRejectedRequests(appAttemptId, response); - } - - private void dispatchRequestsForPlacement(ApplicationAttemptId appAttemptId, - List<SchedulingRequest> schedulingRequests) { - if (schedulingRequests != null && !schedulingRequests.isEmpty()) { - // Normalize the Requests before dispatching - schedulingRequests.forEach(req -> { - Resource reqResource = req.getResourceSizing().getResources(); - req.getResourceSizing() - .setResources(this.scheduler.getNormalizedResource(reqResource)); - }); - this.placementDispatcher.dispatch(new BatchedRequests(iteratorType, - appAttemptId.getApplicationId(), schedulingRequests, 1)); - } - } - - private void reDispatchRetryableRequests(ApplicationAttemptId appAttId) { - List<BatchedRequests> reqsToRetry = - this.requestsToRetry.get(appAttId.getApplicationId()); - if (reqsToRetry != null && !reqsToRetry.isEmpty()) { - synchronized (reqsToRetry) { - for (BatchedRequests bReq: reqsToRetry) { - this.placementDispatcher.dispatch(bReq); - } - reqsToRetry.clear(); - } - } - } - - private void schedulePlacedRequests(ApplicationAttemptId appAttemptId) { - ApplicationId applicationId = appAttemptId.getApplicationId(); - List<PlacedSchedulingRequest> placedSchedulingRequests = - this.placementDispatcher.pullPlacedRequests(applicationId); - for (PlacedSchedulingRequest placedReq : placedSchedulingRequests) { - SchedulingRequest sReq = placedReq.getSchedulingRequest(); - for (SchedulerNode node : placedReq.getNodes()) { - final SchedulingRequest sReqClone = - SchedulingRequest.newInstance(sReq.getAllocationRequestId(), - sReq.getPriority(), sReq.getExecutionType(), - sReq.getAllocationTags(), - ResourceSizing.newInstance( - sReq.getResourceSizing().getResources()), - sReq.getPlacementConstraint()); - SchedulerApplicationAttempt applicationAttempt = - this.scheduler.getApplicationAttempt(appAttemptId); - Runnable task = () -> { - boolean success = - scheduler.attemptAllocationOnNode( - applicationAttempt, sReqClone, node); - if (!success) { - LOG.warn("Unsuccessful allocation attempt [{}] for [{}]", - placedReq.getPlacementAttempt(), sReqClone); - } - handleSchedulingResponse( - new Response(success, applicationId, sReqClone, - placedReq.getPlacementAttempt(), node)); - }; - this.schedulingThreadPool.submit(task); - } - } - } - - private void handleRejectedRequests(ApplicationAttemptId appAttemptId, - AllocateResponse response) { - List<SchedulingRequestWithPlacementAttempt> rejectedAlgoRequests = - this.placementDispatcher.pullRejectedRequests( - appAttemptId.getApplicationId()); - if (rejectedAlgoRequests != null && !rejectedAlgoRequests.isEmpty()) { - LOG.warn("Following requests of [{}] were rejected by" + - " the PlacementAlgorithmOutput Algorithm: {}", - appAttemptId.getApplicationId(), rejectedAlgoRequests); - rejectedAlgoRequests.stream() - .filter(req -> req.getPlacementAttempt() < retryAttempts) - .forEach(req -> handleSchedulingResponse( - new Response(false, appAttemptId.getApplicationId(), - req.getSchedulingRequest(), req.getPlacementAttempt(), - null))); - ApplicationMasterServiceUtils.addToRejectedSchedulingRequests(response, - rejectedAlgoRequests.stream() - .filter(req -> req.getPlacementAttempt() >= retryAttempts) - .map(sr -> RejectedSchedulingRequest.newInstance( - RejectionReason.COULD_NOT_PLACE_ON_NODE, - sr.getSchedulingRequest())) - .collect(Collectors.toList())); - } - List<SchedulingRequest> rejectedRequests = - this.requestsToReject.get(appAttemptId.getApplicationId()); - if (rejectedRequests != null && !rejectedRequests.isEmpty()) { - synchronized (rejectedRequests) { - LOG.warn("Following requests of [{}] exhausted all retry attempts " + - "trying to schedule on placed node: {}", - appAttemptId.getApplicationId(), rejectedRequests); - ApplicationMasterServiceUtils.addToRejectedSchedulingRequests(response, - rejectedRequests.stream() - .map(sr -> RejectedSchedulingRequest.newInstance( - RejectionReason.COULD_NOT_SCHEDULE_ON_NODE, sr)) - .collect(Collectors.toList())); - rejectedRequests.clear(); - } - } - } - - @Override - public void finishApplicationMaster(ApplicationAttemptId appAttemptId, - FinishApplicationMasterRequest request, - FinishApplicationMasterResponse response) { - constraintManager.unregisterApplication(appAttemptId.getApplicationId()); - placementDispatcher.clearApplicationState(appAttemptId.getApplicationId()); - requestsToReject.remove(appAttemptId.getApplicationId()); - requestsToRetry.remove(appAttemptId.getApplicationId()); - nextAMSProcessor.finishApplicationMaster(appAttemptId, request, response); - } - - private void handleSchedulingResponse(SchedulingResponse schedulerResponse) { - int placementAttempt = ((Response)schedulerResponse).placementAttempt; - // Retry this placement as it is not successful and we are still - // under max retry. The req is batched with other unsuccessful - // requests from the same app - if (!schedulerResponse.isSuccess() && placementAttempt < retryAttempts) { - List<BatchedRequests> reqsToRetry = - requestsToRetry.computeIfAbsent( - schedulerResponse.getApplicationId(), - k -> new ArrayList<>()); - synchronized (reqsToRetry) { - addToRetryList(schedulerResponse, placementAttempt, reqsToRetry); - } - LOG.warn("Going to retry request for application [{}] after [{}]" + - " attempts: [{}]", schedulerResponse.getApplicationId(), - placementAttempt, schedulerResponse.getSchedulingRequest()); - } else { - if (!schedulerResponse.isSuccess()) { - LOG.warn("Not retrying request for application [{}] after [{}]" + - " attempts: [{}]", schedulerResponse.getApplicationId(), - placementAttempt, schedulerResponse.getSchedulingRequest()); - List<SchedulingRequest> reqsToReject = - requestsToReject.computeIfAbsent( - schedulerResponse.getApplicationId(), - k -> new ArrayList<>()); - synchronized (reqsToReject) { - reqsToReject.add(schedulerResponse.getSchedulingRequest()); - } - } - } - } - - private void addToRetryList(SchedulingResponse schedulerResponse, - int placementAttempt, List<BatchedRequests> reqsToRetry) { - boolean isAdded = false; - for (BatchedRequests br : reqsToRetry) { - if (br.getPlacementAttempt() == placementAttempt + 1) { - br.addToBatch(schedulerResponse.getSchedulingRequest()); - br.addToBlacklist( - schedulerResponse.getSchedulingRequest().getAllocationTags(), - ((Response) schedulerResponse).attemptedNode); - isAdded = true; - break; - } - } - if (!isAdded) { - BatchedRequests br = new BatchedRequests(iteratorType, - schedulerResponse.getApplicationId(), - Collections.singleton(schedulerResponse.getSchedulingRequest()), - placementAttempt + 1); - reqsToRetry.add(br); - br.addToBlacklist( - schedulerResponse.getSchedulingRequest().getAllocationTags(), - ((Response) schedulerResponse).attemptedNode); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SchedulerPlacementProcessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SchedulerPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SchedulerPlacementProcessor.java new file mode 100644 index 0000000..5332e34 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SchedulerPlacementProcessor.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Forwarding SchedulingRequests to be handled by the scheduler, as long as the + * scheduler supports SchedulingRequests. + */ +public class SchedulerPlacementProcessor extends AbstractPlacementProcessor { + private static final Logger LOG = + LoggerFactory.getLogger(SchedulerPlacementProcessor.class); + + @Override + public void allocate(ApplicationAttemptId appAttemptId, + AllocateRequest request, AllocateResponse response) throws YarnException { + if (request.getSchedulingRequests() != null + && !request.getSchedulingRequests().isEmpty()) { + if (!(scheduler instanceof CapacityScheduler)) { + String message = "Found non empty SchedulingRequest of " + + "AllocateRequest for application=" + appAttemptId.toString() + + ", however the configured scheduler=" + + scheduler.getClass().getCanonicalName() + + " cannot handle placement constraints, rejecting this " + + "allocate operation"; + LOG.warn(message); + throw new YarnException(message); + } + } + nextAMSProcessor.allocate(appAttemptId, request, response); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java index 484d780..ee7e013 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java @@ -50,6 +50,8 @@ public class TestCapacitySchedulerSchedulingRequestUpdate Configuration conf = TestUtils.getConfigurationWithQueueLabels( new Configuration(false)); conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); mgr.init(conf); @@ -166,6 +168,8 @@ public class TestCapacitySchedulerSchedulingRequestUpdate Configuration conf = TestUtils.getConfigurationWithQueueLabels( new Configuration(false)); conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); mgr.init(conf); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java index b297f79..27d8661 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java @@ -58,8 +58,8 @@ public class TestSchedulingRequestContainerAllocation { public void testIntraAppAntiAffinity() throws Exception { Configuration csConf = TestUtils.getConfigurationWithMultipleQueues( new Configuration()); - csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED, - true); + csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); // inject node label manager MockRM rm1 = new MockRM(csConf) { @@ -141,8 +141,8 @@ public class TestSchedulingRequestContainerAllocation { public void testIntraAppAntiAffinityWithMultipleTags() throws Exception { Configuration csConf = TestUtils.getConfigurationWithMultipleQueues( new Configuration()); - csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED, - true); + csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); // inject node label manager MockRM rm1 = new MockRM(csConf) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java index fc1cb0d..d1d05dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java @@ -57,13 +57,13 @@ public class TestSchedulingRequestContainerAllocationAsync { private void testIntraAppAntiAffinityAsync(int numThreads) throws Exception { Configuration csConf = TestUtils.getConfigurationWithMultipleQueues( new Configuration()); - csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED, - true); csConf.setInt( CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, numThreads); csConf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".scheduling-interval-ms", 0); + csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); // inject node label manager MockRM rm1 = new MockRM(csConf) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 7180e24..fae63be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -275,9 +275,7 @@ public class TestUtils { public static Configuration getConfigurationWithQueueLabels(Configuration config) { CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(config); - conf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED, - true); - + // Define top-level queues conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"}); conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java index c4c0b5d..e129a75 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java @@ -86,8 +86,8 @@ public class TestPlacementProcessor { YarnConfiguration conf = new YarnConfiguration(csConf); conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); - conf.setBoolean( - YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER); conf.setInt( YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS, 1); startRM(conf); @@ -381,8 +381,8 @@ public class TestPlacementProcessor { YarnConfiguration conf = new YarnConfiguration(csConf); conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); - conf.setBoolean( - YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER); startRM(conf); HashMap<NodeId, MockNM> nodes = new HashMap<>(); @@ -533,8 +533,8 @@ public class TestPlacementProcessor { YarnConfiguration conf = new YarnConfiguration(csConf); conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); - conf.setBoolean( - YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER); conf.setInt( YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS, 2); startRM(conf); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org