Repository: hadoop Updated Branches: refs/heads/branch-2 2d62af654 -> 043b7d133
YARN-6776. Refactor ApplicaitonMasterService to move actual processing logic to a separate class. (asuresh) (cherry picked from commit 5496a34c0cb2b1a83cfa6b0aba5a77b05ff2d8f0) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/043b7d13 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/043b7d13 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/043b7d13 Branch: refs/heads/branch-2 Commit: 043b7d133ef48d590c62b786c53ec8e6d0e2e4c5 Parents: 2d62af6 Author: Arun Suresh <asur...@apache.org> Authored: Mon Jul 10 14:34:58 2017 -0700 Committer: Arun Suresh <asur...@apache.org> Committed: Fri Aug 4 16:43:35 2017 -0700 ---------------------------------------------------------------------- .../ams/ApplicationMasterServiceProcessor.java | 71 +++ .../yarn/ams/ApplicationMasterServiceUtils.java | 89 ++++ .../apache/hadoop/yarn/ams/package-info.java | 24 + .../ApplicationMasterService.java | 424 +----------------- .../resourcemanager/DefaultAMSProcessor.java | 447 +++++++++++++++++++ ...pportunisticContainerAllocatorAMService.java | 162 +++---- ...pportunisticContainerAllocatorAMService.java | 8 + 7 files changed, 746 insertions(+), 479 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/043b7d13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceProcessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceProcessor.java new file mode 100644 index 0000000..b426f48 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceProcessor.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.ams; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords + .FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords + .RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.exceptions.YarnException; + +import java.io.IOException; + +/** + * Interface to abstract out the the actual processing logic of the + * Application Master Service. + */ +public interface ApplicationMasterServiceProcessor { + + /** + * Register AM attempt. + * @param applicationAttemptId applicationAttemptId. + * @param request Register Request. + * @return Register Response. + * @throws IOException IOException. + */ + RegisterApplicationMasterResponse registerApplicationMaster( + ApplicationAttemptId applicationAttemptId, + RegisterApplicationMasterRequest request) throws IOException; + + /** + * Allocate call. + * @param appAttemptId appAttemptId. + * @param request Allocate Request. + * @return Allocate Response. + * @throws YarnException YarnException. + */ + AllocateResponse allocate(ApplicationAttemptId appAttemptId, + AllocateRequest request) throws YarnException; + + /** + * Finish AM. + * @param applicationAttemptId applicationAttemptId. + * @param request Finish AM Request. + * @return Finish AM response. + */ + FinishApplicationMasterResponse finishApplicationMaster( + ApplicationAttemptId applicationAttemptId, + FinishApplicationMasterRequest request); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/043b7d13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceUtils.java new file mode 100644 index 0000000..476da8b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceUtils.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.ams; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; +import org.apache.hadoop.yarn.api.records.UpdateContainerError; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; + +import java.util.ArrayList; +import java.util.List; + +/** + * Utility methods to be used by {@link ApplicationMasterServiceProcessor}. + */ +public final class ApplicationMasterServiceUtils { + + private ApplicationMasterServiceUtils() { } + + /** + * Add update container errors to {@link AllocateResponse}. + * @param allocateResponse Allocate Response. + * @param updateContainerErrors Errors. + */ + public static void addToUpdateContainerErrors( + AllocateResponse allocateResponse, + List<UpdateContainerError> updateContainerErrors) { + if (!updateContainerErrors.isEmpty()) { + if (allocateResponse.getUpdateErrors() != null + && !allocateResponse.getUpdateErrors().isEmpty()) { + updateContainerErrors.addAll(allocateResponse.getUpdateErrors()); + } + allocateResponse.setUpdateErrors(updateContainerErrors); + } + } + + /** + * Add updated containers to {@link AllocateResponse}. + * @param allocateResponse Allocate Response. + * @param updateType Update Type. + * @param updatedContainers Updated Containers. + */ + public static void addToUpdatedContainers(AllocateResponse allocateResponse, + ContainerUpdateType updateType, List<Container> updatedContainers) { + if (updatedContainers != null && updatedContainers.size() > 0) { + ArrayList<UpdatedContainer> containersToSet = new ArrayList<>(); + if (allocateResponse.getUpdatedContainers() != null && + !allocateResponse.getUpdatedContainers().isEmpty()) { + containersToSet.addAll(allocateResponse.getUpdatedContainers()); + } + for (Container updatedContainer : updatedContainers) { + containersToSet.add( + UpdatedContainer.newInstance(updateType, updatedContainer)); + } + allocateResponse.setUpdatedContainers(containersToSet); + } + } + + /** + * Add allocated containers to {@link AllocateResponse}. + * @param allocateResponse Allocate Response. + * @param allocatedContainers Allocated Containers. + */ + public static void addToAllocatedContainers(AllocateResponse allocateResponse, + List<Container> allocatedContainers) { + if (allocateResponse.getAllocatedContainers() != null + && !allocateResponse.getAllocatedContainers().isEmpty()) { + allocatedContainers.addAll(allocateResponse.getAllocatedContainers()); + } + allocateResponse.setAllocatedContainers(allocatedContainers); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/043b7d13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/package-info.java new file mode 100644 index 0000000..b23534e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Public api for Application Master Service interceptors. + */ +@InterfaceAudience.Public +package org.apache.hadoop.yarn.ams; +import org.apache.hadoop.classification.InterfaceAudience; + http://git-wip-us.apache.org/repos/asf/hadoop/blob/043b7d13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 9d18ced..5ed0a80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -21,12 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; -import java.net.UnknownHostException; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -37,10 +33,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.SaslRpcServer; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -52,30 +48,11 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerUpdateType; -import org.apache.hadoop.yarn.api.records.NMToken; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeReport; -import org.apache.hadoop.yarn.api.records.PreemptionContainer; -import org.apache.hadoop.yarn.api.records.PreemptionContract; -import org.apache.hadoop.yarn.api.records.PreemptionMessage; -import org.apache.hadoop.yarn.api.records.PreemptionResourceRequest; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; -import org.apache.hadoop.yarn.api.records.UpdateContainerError; -import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; -import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; -import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException; -import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -86,25 +63,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; - -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler - .AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler - .SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security .AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.security.MasterKeyData; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import org.apache.hadoop.yarn.util.resource.Resources; @@ -124,6 +88,12 @@ public class ApplicationMasterService extends AbstractService implements private final ConcurrentMap<ApplicationAttemptId, AllocateResponseLock> responseMap = new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>(); protected final RMContext rmContext; + private final ApplicationMasterServiceProcessor amsProcessor; + + public ApplicationMasterService(RMContext rmContext, + YarnScheduler scheduler) { + this(ApplicationMasterService.class.getName(), rmContext, scheduler); + } public ApplicationMasterService(String name, RMContext rmContext, YarnScheduler scheduler) { @@ -131,11 +101,11 @@ public class ApplicationMasterService extends AbstractService implements this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor(); this.rScheduler = scheduler; this.rmContext = rmContext; + this.amsProcessor = createProcessor(); } - public ApplicationMasterService(RMContext rmContext, - YarnScheduler scheduler) { - this(ApplicationMasterService.class.getName(), rmContext, scheduler); + protected ApplicationMasterServiceProcessor createProcessor() { + return new DefaultAMSProcessor(rmContext, rScheduler); } @Override @@ -230,82 +200,22 @@ public class ApplicationMasterService extends AbstractService implements + appID; LOG.warn(message); RMAuditLogger.logFailure( - this.rmContext.getRMApps() - .get(appID).getUser(), - AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message, - appID, applicationAttemptId); + this.rmContext.getRMApps() + .get(appID).getUser(), + AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message, + appID, applicationAttemptId); throw new InvalidApplicationMasterRequestException(message); } - + this.amLivelinessMonitor.receivedPing(applicationAttemptId); - RMApp app = this.rmContext.getRMApps().get(appID); - + // Setting the response id to 0 to identify if the // application master is register for the respective attemptid lastResponse.setResponseId(0); lock.setAllocateResponse(lastResponse); - LOG.info("AM registration " + applicationAttemptId); - this.rmContext - .getDispatcher() - .getEventHandler() - .handle( - new RMAppAttemptRegistrationEvent(applicationAttemptId, request - .getHost(), request.getRpcPort(), request.getTrackingUrl())); - RMAuditLogger.logSuccess(app.getUser(), AuditConstants.REGISTER_AM, - "ApplicationMasterService", appID, applicationAttemptId); - - // Pick up min/max resource from scheduler... - RegisterApplicationMasterResponse response = recordFactory - .newRecordInstance(RegisterApplicationMasterResponse.class); - response.setMaximumResourceCapability(rScheduler - .getMaximumResourceCapability(app.getQueue())); - response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId) - .getSubmissionContext().getAMContainerSpec().getApplicationACLs()); - response.setQueue(app.getQueue()); - if (UserGroupInformation.isSecurityEnabled()) { - LOG.info("Setting client token master key"); - response.setClientToAMTokenMasterKey(java.nio.ByteBuffer.wrap(rmContext - .getClientToAMTokenSecretManager() - .getMasterKey(applicationAttemptId).getEncoded())); - } - - // For work-preserving AM restart, retrieve previous attempts' containers - // and corresponding NM tokens. - if (app.getApplicationSubmissionContext() - .getKeepContainersAcrossApplicationAttempts()) { - List<Container> transferredContainers = rScheduler - .getTransferredContainers(applicationAttemptId); - if (!transferredContainers.isEmpty()) { - response.setContainersFromPreviousAttempts(transferredContainers); - List<NMToken> nmTokens = new ArrayList<NMToken>(); - for (Container container : transferredContainers) { - try { - NMToken token = rmContext.getNMTokenSecretManager() - .createAndGetNMToken(app.getUser(), applicationAttemptId, - container); - if (null != token) { - nmTokens.add(token); - } - } catch (IllegalArgumentException e) { - // if it's a DNS issue, throw UnknowHostException directly and - // that - // will be automatically retried by RMProxy in RPC layer. - if (e.getCause() instanceof UnknownHostException) { - throw (UnknownHostException) e.getCause(); - } - } - } - response.setNMTokensFromPreviousAttempts(nmTokens); - LOG.info("Application " + appID + " retrieved " - + transferredContainers.size() + " containers from previous" - + " attempts and " + nmTokens.size() + " NM tokens."); - } - } - response.setSchedulerResourceTypes(rScheduler - .getSchedulingResourceTypes()); - - return response; + return this.amsProcessor.registerApplicationMaster( + amrmTokenIdentifier.getApplicationAttemptId(), request); } } @@ -350,15 +260,8 @@ public class ApplicationMasterService extends AbstractService implements } this.amLivelinessMonitor.receivedPing(applicationAttemptId); - - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptUnregistrationEvent(applicationAttemptId, request - .getTrackingUrl(), request.getFinalApplicationStatus(), request - .getDiagnostics())); - - // For UnmanagedAMs, return true so they don't retry - return FinishApplicationMasterResponse.newInstance( - rmApp.getApplicationSubmissionContext().getUnmanagedAM()); + return this.amsProcessor.finishApplicationMaster( + applicationAttemptId, request); } } @@ -438,10 +341,8 @@ public class ApplicationMasterService extends AbstractService implements throw new InvalidApplicationMasterRequestException(message); } - AllocateResponse response = - recordFactory.newRecordInstance(AllocateResponse.class); - allocateInternal(amrmTokenIdentifier.getApplicationAttemptId(), - request, response); + AllocateResponse response = this.amsProcessor.allocate( + amrmTokenIdentifier.getApplicationAttemptId(), request); // update AMRMToken if the token is rolled-up MasterKeyData nextMasterKey = @@ -477,288 +378,7 @@ public class ApplicationMasterService extends AbstractService implements response.setResponseId(lastResponse.getResponseId() + 1); lock.setAllocateResponse(response); return response; - } - } - - protected void allocateInternal(ApplicationAttemptId appAttemptId, - AllocateRequest request, AllocateResponse allocateResponse) - throws YarnException { - - //filter illegal progress values - float filteredProgress = request.getProgress(); - if (Float.isNaN(filteredProgress) || - filteredProgress == Float.NEGATIVE_INFINITY || - filteredProgress < 0) { - request.setProgress(0); - } else if (filteredProgress > 1 || - filteredProgress == Float.POSITIVE_INFINITY) { - request.setProgress(1); - } - - // Send the status update to the appAttempt. - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptStatusupdateEvent(appAttemptId, request - .getProgress())); - - List<ResourceRequest> ask = request.getAskList(); - List<ContainerId> release = request.getReleaseList(); - - ResourceBlacklistRequest blacklistRequest = - request.getResourceBlacklistRequest(); - List<String> blacklistAdditions = - (blacklistRequest != null) ? - blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST; - List<String> blacklistRemovals = - (blacklistRequest != null) ? - blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST; - RMApp app = - this.rmContext.getRMApps().get(appAttemptId.getApplicationId()); - - // set label expression for Resource Requests if resourceName=ANY - ApplicationSubmissionContext asc = app.getApplicationSubmissionContext(); - for (ResourceRequest req : ask) { - if (null == req.getNodeLabelExpression() - && ResourceRequest.ANY.equals(req.getResourceName())) { - req.setNodeLabelExpression(asc.getNodeLabelExpression()); - } - } - - Resource maximumCapacity = rScheduler.getMaximumResourceCapability(); - - // sanity check - try { - RMServerUtils.normalizeAndValidateRequests(ask, - maximumCapacity, app.getQueue(), - rScheduler, rmContext); - } catch (InvalidResourceRequestException e) { - LOG.warn("Invalid resource ask by application " + appAttemptId, e); - throw e; - } - - try { - RMServerUtils.validateBlacklistRequest(blacklistRequest); - } catch (InvalidResourceBlacklistRequestException e) { - LOG.warn("Invalid blacklist request by application " + appAttemptId, e); - throw e; } - - // In the case of work-preserving AM restart, it's possible for the - // AM to release containers from the earlier attempt. - if (!app.getApplicationSubmissionContext() - .getKeepContainersAcrossApplicationAttempts()) { - try { - RMServerUtils.validateContainerReleaseRequest(release, appAttemptId); - } catch (InvalidContainerReleaseException e) { - LOG.warn("Invalid container release by application " + appAttemptId, - e); - throw e; - } - } - - // Split Update Resource Requests into increase and decrease. - // No Exceptions are thrown here. All update errors are aggregated - // and returned to the AM. - List<UpdateContainerError> updateErrors = new ArrayList<>(); - ContainerUpdates containerUpdateRequests = - RMServerUtils.validateAndSplitUpdateResourceRequests( - rmContext, request, maximumCapacity, updateErrors); - - // Send new requests to appAttempt. - Allocation allocation; - RMAppAttemptState state = - app.getRMAppAttempt(appAttemptId).getAppAttemptState(); - if (state.equals(RMAppAttemptState.FINAL_SAVING) || - state.equals(RMAppAttemptState.FINISHING) || - app.isAppFinalStateStored()) { - LOG.warn(appAttemptId + " is in " + state + - " state, ignore container allocate request."); - allocation = EMPTY_ALLOCATION; - } else { - allocation = - this.rScheduler.allocate(appAttemptId, ask, release, - blacklistAdditions, blacklistRemovals, - containerUpdateRequests); - } - - if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) { - LOG.info("blacklist are updated in Scheduler." + - "blacklistAdditions: " + blacklistAdditions + ", " + - "blacklistRemovals: " + blacklistRemovals); - } - RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); - - if (allocation.getNMTokens() != null && - !allocation.getNMTokens().isEmpty()) { - allocateResponse.setNMTokens(allocation.getNMTokens()); - } - - // Notify the AM of container update errors - addToUpdateContainerErrors(allocateResponse, updateErrors); - - // update the response with the deltas of node status changes - List<RMNode> updatedNodes = new ArrayList<RMNode>(); - if(app.pullRMNodeUpdates(updatedNodes) > 0) { - List<NodeReport> updatedNodeReports = new ArrayList<NodeReport>(); - for(RMNode rmNode: updatedNodes) { - SchedulerNodeReport schedulerNodeReport = - rScheduler.getNodeReport(rmNode.getNodeID()); - Resource used = BuilderUtils.newResource(0, 0); - int numContainers = 0; - if (schedulerNodeReport != null) { - used = schedulerNodeReport.getUsedResource(); - numContainers = schedulerNodeReport.getNumContainers(); - } - NodeId nodeId = rmNode.getNodeID(); - NodeReport report = - BuilderUtils.newNodeReport(nodeId, rmNode.getState(), - rmNode.getHttpAddress(), rmNode.getRackName(), used, - rmNode.getTotalCapability(), numContainers, - rmNode.getHealthReport(), rmNode.getLastHealthReportTime(), - rmNode.getNodeLabels()); - - updatedNodeReports.add(report); - } - allocateResponse.setUpdatedNodes(updatedNodeReports); - } - - addToAllocatedContainers(allocateResponse, allocation.getContainers()); - - allocateResponse.setCompletedContainersStatuses(appAttempt - .pullJustFinishedContainers()); - allocateResponse.setAvailableResources(allocation.getResourceLimit()); - - addToContainerUpdates(appAttemptId, allocateResponse, allocation); - - allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); - - // add preemption to the allocateResponse message (if any) - allocateResponse - .setPreemptionMessage(generatePreemptionMessage(allocation)); - - // Set application priority - allocateResponse.setApplicationPriority(app - .getApplicationPriority()); - } - - private void addToContainerUpdates(ApplicationAttemptId appAttemptId, - AllocateResponse allocateResponse, Allocation allocation) { - // Handling increased containers - addToUpdatedContainers( - allocateResponse, ContainerUpdateType.INCREASE_RESOURCE, - allocation.getIncreasedContainers()); - - // Handling decreased containers - addToUpdatedContainers( - allocateResponse, ContainerUpdateType.DECREASE_RESOURCE, - allocation.getDecreasedContainers()); - - // Handling promoted containers - addToUpdatedContainers( - allocateResponse, ContainerUpdateType.PROMOTE_EXECUTION_TYPE, - allocation.getPromotedContainers()); - - // Handling demoted containers - addToUpdatedContainers( - allocateResponse, ContainerUpdateType.DEMOTE_EXECUTION_TYPE, - allocation.getDemotedContainers()); - - SchedulerApplicationAttempt applicationAttempt = ((AbstractYarnScheduler) - rScheduler).getApplicationAttempt(appAttemptId); - if (applicationAttempt != null) { - addToUpdateContainerErrors(allocateResponse, - ((AbstractYarnScheduler)rScheduler) - .getApplicationAttempt(appAttemptId).pullUpdateContainerErrors()); - } - } - - protected void addToUpdateContainerErrors(AllocateResponse allocateResponse, - List<UpdateContainerError> updateContainerErrors) { - if (!updateContainerErrors.isEmpty()) { - if (allocateResponse.getUpdateErrors() != null - && !allocateResponse.getUpdateErrors().isEmpty()) { - updateContainerErrors = new ArrayList<>(updateContainerErrors); - updateContainerErrors.addAll(allocateResponse.getUpdateErrors()); - } - allocateResponse.setUpdateErrors(updateContainerErrors); - } - } - - protected void addToUpdatedContainers(AllocateResponse allocateResponse, - ContainerUpdateType updateType, List<Container> updatedContainers) { - if (updatedContainers != null && updatedContainers.size() > 0) { - ArrayList<UpdatedContainer> containersToSet = new ArrayList<>(); - if (allocateResponse.getUpdatedContainers() != null && - !allocateResponse.getUpdatedContainers().isEmpty()) { - containersToSet.addAll(allocateResponse.getUpdatedContainers()); - } - for (Container updatedContainer : updatedContainers) { - containersToSet.add( - UpdatedContainer.newInstance(updateType, updatedContainer)); - } - allocateResponse.setUpdatedContainers(containersToSet); - } - } - - protected void addToAllocatedContainers(AllocateResponse allocateResponse, - List<Container> allocatedContainers) { - if (allocateResponse.getAllocatedContainers() != null - && !allocateResponse.getAllocatedContainers().isEmpty()) { - allocatedContainers = new ArrayList<>(allocatedContainers); - allocatedContainers.addAll(allocateResponse.getAllocatedContainers()); - } - allocateResponse.setAllocatedContainers(allocatedContainers); - } - - private PreemptionMessage generatePreemptionMessage(Allocation allocation){ - PreemptionMessage pMsg = null; - // assemble strict preemption request - if (allocation.getStrictContainerPreemptions() != null) { - pMsg = - recordFactory.newRecordInstance(PreemptionMessage.class); - StrictPreemptionContract pStrict = - recordFactory.newRecordInstance(StrictPreemptionContract.class); - Set<PreemptionContainer> pCont = new HashSet<PreemptionContainer>(); - for (ContainerId cId : allocation.getStrictContainerPreemptions()) { - PreemptionContainer pc = - recordFactory.newRecordInstance(PreemptionContainer.class); - pc.setId(cId); - pCont.add(pc); - } - pStrict.setContainers(pCont); - pMsg.setStrictContract(pStrict); - } - - // assemble negotiable preemption request - if (allocation.getResourcePreemptions() != null && - allocation.getResourcePreemptions().size() > 0 && - allocation.getContainerPreemptions() != null && - allocation.getContainerPreemptions().size() > 0) { - if (pMsg == null) { - pMsg = - recordFactory.newRecordInstance(PreemptionMessage.class); - } - PreemptionContract contract = - recordFactory.newRecordInstance(PreemptionContract.class); - Set<PreemptionContainer> pCont = new HashSet<PreemptionContainer>(); - for (ContainerId cId : allocation.getContainerPreemptions()) { - PreemptionContainer pc = - recordFactory.newRecordInstance(PreemptionContainer.class); - pc.setId(cId); - pCont.add(pc); - } - List<PreemptionResourceRequest> pRes = new ArrayList<PreemptionResourceRequest>(); - for (ResourceRequest crr : allocation.getResourcePreemptions()) { - PreemptionResourceRequest prr = - recordFactory.newRecordInstance(PreemptionResourceRequest.class); - prr.setResourceRequest(crr); - pRes.add(prr); - } - contract.setContainers(pCont); - contract.setResourceRequest(pRes); - pMsg.setContract(contract); - } - - return pMsg; } public void registerAppAttempt(ApplicationAttemptId attemptId) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/043b7d13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java new file mode 100644 index 0000000..ecb66c3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java @@ -0,0 +1,447 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils; +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.PreemptionContainer; +import org.apache.hadoop.yarn.api.records.PreemptionContract; +import org.apache.hadoop.yarn.api.records.PreemptionMessage; +import org.apache.hadoop.yarn.api.records.PreemptionResourceRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; +import org.apache.hadoop.yarn.api.records.UpdateContainerError; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; +import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException; +import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt + .RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event + .RMAppAttemptStatusupdateEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event + .RMAppAttemptUnregistrationEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .SchedulerNodeReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +class DefaultAMSProcessor implements ApplicationMasterServiceProcessor { + + private static final Log LOG = LogFactory.getLog(DefaultAMSProcessor.class); + + private final static List<Container> EMPTY_CONTAINER_LIST = + new ArrayList<Container>(); + protected static final Allocation EMPTY_ALLOCATION = new Allocation( + EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null); + + private final RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + private final RMContext rmContext; + private final YarnScheduler scheduler; + + DefaultAMSProcessor(RMContext rmContext, YarnScheduler scheduler) { + this.rmContext = rmContext; + this.scheduler = scheduler; + } + + public RegisterApplicationMasterResponse registerApplicationMaster( + ApplicationAttemptId applicationAttemptId, + RegisterApplicationMasterRequest request) throws IOException { + + RMApp app = getRmContext().getRMApps().get( + applicationAttemptId.getApplicationId()); + LOG.info("AM registration " + applicationAttemptId); + getRmContext().getDispatcher().getEventHandler() + .handle( + new RMAppAttemptRegistrationEvent(applicationAttemptId, request + .getHost(), request.getRpcPort(), request.getTrackingUrl())); + RMAuditLogger.logSuccess(app.getUser(), + RMAuditLogger.AuditConstants.REGISTER_AM, + "ApplicationMasterService", app.getApplicationId(), + applicationAttemptId); + RegisterApplicationMasterResponse response = recordFactory + .newRecordInstance(RegisterApplicationMasterResponse.class); + response.setMaximumResourceCapability(getScheduler() + .getMaximumResourceCapability(app.getQueue())); + response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId) + .getSubmissionContext().getAMContainerSpec().getApplicationACLs()); + response.setQueue(app.getQueue()); + if (UserGroupInformation.isSecurityEnabled()) { + LOG.info("Setting client token master key"); + response.setClientToAMTokenMasterKey(java.nio.ByteBuffer.wrap( + getRmContext().getClientToAMTokenSecretManager() + .getMasterKey(applicationAttemptId).getEncoded())); + } + + // For work-preserving AM restart, retrieve previous attempts' containers + // and corresponding NM tokens. + if (app.getApplicationSubmissionContext() + .getKeepContainersAcrossApplicationAttempts()) { + List<Container> transferredContainers = getScheduler() + .getTransferredContainers(applicationAttemptId); + if (!transferredContainers.isEmpty()) { + response.setContainersFromPreviousAttempts(transferredContainers); + List<NMToken> nmTokens = new ArrayList<NMToken>(); + for (Container container : transferredContainers) { + try { + NMToken token = getRmContext().getNMTokenSecretManager() + .createAndGetNMToken(app.getUser(), applicationAttemptId, + container); + if (null != token) { + nmTokens.add(token); + } + } catch (IllegalArgumentException e) { + // if it's a DNS issue, throw UnknowHostException directly and + // that + // will be automatically retried by RMProxy in RPC layer. + if (e.getCause() instanceof UnknownHostException) { + throw (UnknownHostException) e.getCause(); + } + } + } + response.setNMTokensFromPreviousAttempts(nmTokens); + LOG.info("Application " + app.getApplicationId() + " retrieved " + + transferredContainers.size() + " containers from previous" + + " attempts and " + nmTokens.size() + " NM tokens."); + } + } + + response.setSchedulerResourceTypes(getScheduler() + .getSchedulingResourceTypes()); + return response; + } + + public AllocateResponse allocate(ApplicationAttemptId appAttemptId, + AllocateRequest request) throws YarnException { + + handleProgress(appAttemptId, request); + + List<ResourceRequest> ask = request.getAskList(); + List<ContainerId> release = request.getReleaseList(); + + ResourceBlacklistRequest blacklistRequest = + request.getResourceBlacklistRequest(); + List<String> blacklistAdditions = + (blacklistRequest != null) ? + blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST; + List<String> blacklistRemovals = + (blacklistRequest != null) ? + blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST; + RMApp app = + getRmContext().getRMApps().get(appAttemptId.getApplicationId()); + + // set label expression for Resource Requests if resourceName=ANY + ApplicationSubmissionContext asc = app.getApplicationSubmissionContext(); + for (ResourceRequest req : ask) { + if (null == req.getNodeLabelExpression() + && ResourceRequest.ANY.equals(req.getResourceName())) { + req.setNodeLabelExpression(asc.getNodeLabelExpression()); + } + } + + Resource maximumCapacity = getScheduler().getMaximumResourceCapability(); + + // sanity check + try { + RMServerUtils.normalizeAndValidateRequests(ask, + maximumCapacity, app.getQueue(), + getScheduler(), getRmContext()); + } catch (InvalidResourceRequestException e) { + LOG.warn("Invalid resource ask by application " + appAttemptId, e); + throw e; + } + + try { + RMServerUtils.validateBlacklistRequest(blacklistRequest); + } catch (InvalidResourceBlacklistRequestException e) { + LOG.warn("Invalid blacklist request by application " + appAttemptId, e); + throw e; + } + + // In the case of work-preserving AM restart, it's possible for the + // AM to release containers from the earlier attempt. + if (!app.getApplicationSubmissionContext() + .getKeepContainersAcrossApplicationAttempts()) { + try { + RMServerUtils.validateContainerReleaseRequest(release, appAttemptId); + } catch (InvalidContainerReleaseException e) { + LOG.warn("Invalid container release by application " + appAttemptId, + e); + throw e; + } + } + + // Split Update Resource Requests into increase and decrease. + // No Exceptions are thrown here. All update errors are aggregated + // and returned to the AM. + List<UpdateContainerError> updateErrors = new ArrayList<>(); + ContainerUpdates containerUpdateRequests = + RMServerUtils.validateAndSplitUpdateResourceRequests( + getRmContext(), request, maximumCapacity, updateErrors); + + // Send new requests to appAttempt. + Allocation allocation; + RMAppAttemptState state = + app.getRMAppAttempt(appAttemptId).getAppAttemptState(); + if (state.equals(RMAppAttemptState.FINAL_SAVING) || + state.equals(RMAppAttemptState.FINISHING) || + app.isAppFinalStateStored()) { + LOG.warn(appAttemptId + " is in " + state + + " state, ignore container allocate request."); + allocation = EMPTY_ALLOCATION; + } else { + allocation = + getScheduler().allocate(appAttemptId, ask, release, + blacklistAdditions, blacklistRemovals, + containerUpdateRequests); + } + + if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) { + LOG.info("blacklist are updated in Scheduler." + + "blacklistAdditions: " + blacklistAdditions + ", " + + "blacklistRemovals: " + blacklistRemovals); + } + RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); + AllocateResponse allocateResponse = + recordFactory.newRecordInstance(AllocateResponse.class); + + if (allocation.getNMTokens() != null && + !allocation.getNMTokens().isEmpty()) { + allocateResponse.setNMTokens(allocation.getNMTokens()); + } + + // Notify the AM of container update errors + ApplicationMasterServiceUtils.addToUpdateContainerErrors( + allocateResponse, updateErrors); + + // update the response with the deltas of node status changes + handleNodeUpdates(app, allocateResponse); + + ApplicationMasterServiceUtils.addToAllocatedContainers( + allocateResponse, allocation.getContainers()); + + allocateResponse.setCompletedContainersStatuses(appAttempt + .pullJustFinishedContainers()); + allocateResponse.setAvailableResources(allocation.getResourceLimit()); + + addToContainerUpdates(allocateResponse, allocation, + ((AbstractYarnScheduler)getScheduler()) + .getApplicationAttempt(appAttemptId).pullUpdateContainerErrors()); + + allocateResponse.setNumClusterNodes(getScheduler().getNumClusterNodes()); + + // add preemption to the allocateResponse message (if any) + allocateResponse + .setPreemptionMessage(generatePreemptionMessage(allocation)); + + // Set application priority + allocateResponse.setApplicationPriority(app + .getApplicationPriority()); + return allocateResponse; + } + + private void handleNodeUpdates(RMApp app, AllocateResponse allocateResponse) { + List<RMNode> updatedNodes = new ArrayList<>(); + if(app.pullRMNodeUpdates(updatedNodes) > 0) { + List<NodeReport> updatedNodeReports = new ArrayList<>(); + for(RMNode rmNode: updatedNodes) { + SchedulerNodeReport schedulerNodeReport = + getScheduler().getNodeReport(rmNode.getNodeID()); + Resource used = BuilderUtils.newResource(0, 0); + int numContainers = 0; + if (schedulerNodeReport != null) { + used = schedulerNodeReport.getUsedResource(); + numContainers = schedulerNodeReport.getNumContainers(); + } + NodeId nodeId = rmNode.getNodeID(); + NodeReport report = + BuilderUtils.newNodeReport(nodeId, rmNode.getState(), + rmNode.getHttpAddress(), rmNode.getRackName(), used, + rmNode.getTotalCapability(), numContainers, + rmNode.getHealthReport(), rmNode.getLastHealthReportTime(), + rmNode.getNodeLabels()); + + updatedNodeReports.add(report); + } + allocateResponse.setUpdatedNodes(updatedNodeReports); + } + } + + private void handleProgress(ApplicationAttemptId appAttemptId, + AllocateRequest request) { + //filter illegal progress values + float filteredProgress = request.getProgress(); + if (Float.isNaN(filteredProgress) || + filteredProgress == Float.NEGATIVE_INFINITY || + filteredProgress < 0) { + request.setProgress(0); + } else if (filteredProgress > 1 || + filteredProgress == Float.POSITIVE_INFINITY) { + request.setProgress(1); + } + + // Send the status update to the appAttempt. + getRmContext().getDispatcher().getEventHandler().handle( + new RMAppAttemptStatusupdateEvent(appAttemptId, request + .getProgress())); + } + + public FinishApplicationMasterResponse finishApplicationMaster( + ApplicationAttemptId applicationAttemptId, + FinishApplicationMasterRequest request) { + RMApp app = + getRmContext().getRMApps().get(applicationAttemptId.getApplicationId()); + // For UnmanagedAMs, return true so they don't retry + FinishApplicationMasterResponse response = + FinishApplicationMasterResponse.newInstance( + app.getApplicationSubmissionContext().getUnmanagedAM()); + getRmContext().getDispatcher().getEventHandler().handle( + new RMAppAttemptUnregistrationEvent(applicationAttemptId, request + .getTrackingUrl(), request.getFinalApplicationStatus(), request + .getDiagnostics())); + return response; + } + + private PreemptionMessage generatePreemptionMessage(Allocation allocation){ + PreemptionMessage pMsg = null; + // assemble strict preemption request + if (allocation.getStrictContainerPreemptions() != null) { + pMsg = + recordFactory.newRecordInstance(PreemptionMessage.class); + StrictPreemptionContract pStrict = + recordFactory.newRecordInstance(StrictPreemptionContract.class); + Set<PreemptionContainer> pCont = new HashSet<>(); + for (ContainerId cId : allocation.getStrictContainerPreemptions()) { + PreemptionContainer pc = + recordFactory.newRecordInstance(PreemptionContainer.class); + pc.setId(cId); + pCont.add(pc); + } + pStrict.setContainers(pCont); + pMsg.setStrictContract(pStrict); + } + + // assemble negotiable preemption request + if (allocation.getResourcePreemptions() != null && + allocation.getResourcePreemptions().size() > 0 && + allocation.getContainerPreemptions() != null && + allocation.getContainerPreemptions().size() > 0) { + if (pMsg == null) { + pMsg = + recordFactory.newRecordInstance(PreemptionMessage.class); + } + PreemptionContract contract = + recordFactory.newRecordInstance(PreemptionContract.class); + Set<PreemptionContainer> pCont = new HashSet<>(); + for (ContainerId cId : allocation.getContainerPreemptions()) { + PreemptionContainer pc = + recordFactory.newRecordInstance(PreemptionContainer.class); + pc.setId(cId); + pCont.add(pc); + } + List<PreemptionResourceRequest> pRes = new ArrayList<>(); + for (ResourceRequest crr : allocation.getResourcePreemptions()) { + PreemptionResourceRequest prr = + recordFactory.newRecordInstance(PreemptionResourceRequest.class); + prr.setResourceRequest(crr); + pRes.add(prr); + } + contract.setContainers(pCont); + contract.setResourceRequest(pRes); + pMsg.setContract(contract); + } + + return pMsg; + } + + protected RMContext getRmContext() { + return rmContext; + } + + protected YarnScheduler getScheduler() { + return scheduler; + } + + private static void addToContainerUpdates(AllocateResponse allocateResponse, + Allocation allocation, List<UpdateContainerError> updateContainerErrors) { + // Handling increased containers + ApplicationMasterServiceUtils.addToUpdatedContainers( + allocateResponse, ContainerUpdateType.INCREASE_RESOURCE, + allocation.getIncreasedContainers()); + + // Handling decreased containers + ApplicationMasterServiceUtils.addToUpdatedContainers( + allocateResponse, ContainerUpdateType.DECREASE_RESOURCE, + allocation.getDecreasedContainers()); + + // Handling promoted containers + ApplicationMasterServiceUtils.addToUpdatedContainers( + allocateResponse, ContainerUpdateType.PROMOTE_EXECUTION_TYPE, + allocation.getPromotedContainers()); + + // Handling demoted containers + ApplicationMasterServiceUtils.addToUpdatedContainers( + allocateResponse, ContainerUpdateType.DEMOTE_EXECUTION_TYPE, + allocation.getDemotedContainers()); + + ApplicationMasterServiceUtils.addToUpdateContainerErrors( + allocateResponse, updateContainerErrors); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/043b7d13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index c8ee0c6..75eb143 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -23,6 +23,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor; +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils; import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; @@ -37,8 +39,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; @@ -69,7 +69,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretMan import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; -import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import java.io.IOException; @@ -102,6 +101,84 @@ public class OpportunisticContainerAllocatorAMService private volatile List<RemoteNode> cachedNodes; private volatile long lastCacheUpdateTime; + class OpportunisticAMSProcessor extends DefaultAMSProcessor { + + OpportunisticAMSProcessor(RMContext rmContext, YarnScheduler + scheduler) { + super(rmContext, scheduler); + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + ApplicationAttemptId applicationAttemptId, + RegisterApplicationMasterRequest request) throws IOException { + final SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler) + getScheduler()).getApplicationAttempt(applicationAttemptId); + if (appAttempt.getOpportunisticContainerContext() == null) { + OpportunisticContainerContext opCtx = + new OpportunisticContainerContext(); + opCtx.setContainerIdGenerator(new OpportunisticContainerAllocator + .ContainerIdGenerator() { + @Override + public long generateContainerId() { + return appAttempt.getAppSchedulingInfo().getNewContainerId(); + } + }); + int tokenExpiryInterval = getConfig() + .getInt(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, + YarnConfiguration. + DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS); + opCtx.updateAllocationParams( + getScheduler().getMinimumResourceCapability(), + getScheduler().getMaximumResourceCapability(), + getScheduler().getMinimumResourceCapability(), + tokenExpiryInterval); + appAttempt.setOpportunisticContainerContext(opCtx); + } + return super.registerApplicationMaster(applicationAttemptId, request); + } + + @Override + public AllocateResponse allocate(ApplicationAttemptId appAttemptId, + AllocateRequest request) throws YarnException { + // Partition requests to GUARANTEED and OPPORTUNISTIC. + OpportunisticContainerAllocator.PartitionedResourceRequests + partitionedAsks = + oppContainerAllocator.partitionAskList(request.getAskList()); + + // Allocate OPPORTUNISTIC containers. + SchedulerApplicationAttempt appAttempt = + ((AbstractYarnScheduler)rmContext.getScheduler()) + .getApplicationAttempt(appAttemptId); + + OpportunisticContainerContext oppCtx = + appAttempt.getOpportunisticContainerContext(); + oppCtx.updateNodeList(getLeastLoadedNodes()); + + List<Container> oppContainers = + oppContainerAllocator.allocateContainers( + request.getResourceBlacklistRequest(), + partitionedAsks.getOpportunistic(), appAttemptId, oppCtx, + ResourceManager.getClusterTimeStamp(), appAttempt.getUser()); + + // Create RMContainers and update the NMTokens. + if (!oppContainers.isEmpty()) { + handleNewContainers(oppContainers, false); + appAttempt.updateNMTokens(oppContainers); + } + + // Allocate GUARANTEED containers. + request.setAskList(partitionedAsks.getGuaranteed()); + + AllocateResponse response = super.allocate(appAttemptId, request); + if (!oppContainers.isEmpty()) { + ApplicationMasterServiceUtils.addToAllocatedContainers( + response, oppContainers); + } + return response; + } + } + public OpportunisticContainerAllocatorAMService(RMContext rmContext, YarnScheduler scheduler) { super(OpportunisticContainerAllocatorAMService.class.getName(), @@ -160,6 +237,11 @@ public class OpportunisticContainerAllocatorAMService } @Override + protected ApplicationMasterServiceProcessor createProcessor() { + return new OpportunisticAMSProcessor(rmContext, rmContext.getScheduler()); + } + + @Override public Server getServer(YarnRPC rpc, Configuration serverConf, InetSocketAddress addr, AMRMTokenSecretManager secretManager) { if (YarnConfiguration.isDistSchedulingEnabled(serverConf)) { @@ -180,80 +262,6 @@ public class OpportunisticContainerAllocatorAMService } @Override - public RegisterApplicationMasterResponse registerApplicationMaster - (RegisterApplicationMasterRequest request) throws YarnException, - IOException { - final ApplicationAttemptId appAttemptId = getAppAttemptId(); - final SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler) - rmContext.getScheduler()).getApplicationAttempt(appAttemptId); - if (appAttempt.getOpportunisticContainerContext() == null) { - OpportunisticContainerContext opCtx = new OpportunisticContainerContext(); - opCtx.setContainerIdGenerator(new OpportunisticContainerAllocator - .ContainerIdGenerator() { - @Override - public long generateContainerId() { - return appAttempt.getAppSchedulingInfo().getNewContainerId(); - } - }); - int tokenExpiryInterval = getConfig() - .getInt(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, - YarnConfiguration.DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS); - opCtx.updateAllocationParams( - rmContext.getScheduler().getMinimumResourceCapability(), - rmContext.getScheduler().getMaximumResourceCapability(), - rmContext.getScheduler().getMinimumResourceCapability(), - tokenExpiryInterval); - appAttempt.setOpportunisticContainerContext(opCtx); - } - return super.registerApplicationMaster(request); - } - - @Override - public FinishApplicationMasterResponse finishApplicationMaster - (FinishApplicationMasterRequest request) throws YarnException, - IOException { - return super.finishApplicationMaster(request); - } - - @Override - protected void allocateInternal(ApplicationAttemptId appAttemptId, - AllocateRequest request, AllocateResponse allocateResponse) - throws YarnException { - - // Partition requests to GUARANTEED and OPPORTUNISTIC. - OpportunisticContainerAllocator.PartitionedResourceRequests - partitionedAsks = - oppContainerAllocator.partitionAskList(request.getAskList()); - - // Allocate OPPORTUNISTIC containers. - SchedulerApplicationAttempt appAttempt = - ((AbstractYarnScheduler)rmContext.getScheduler()) - .getApplicationAttempt(appAttemptId); - - OpportunisticContainerContext oppCtx = - appAttempt.getOpportunisticContainerContext(); - oppCtx.updateNodeList(getLeastLoadedNodes()); - - List<Container> oppContainers = - oppContainerAllocator.allocateContainers( - request.getResourceBlacklistRequest(), - partitionedAsks.getOpportunistic(), appAttemptId, oppCtx, - ResourceManager.getClusterTimeStamp(), appAttempt.getUser()); - - // Create RMContainers and update the NMTokens. - if (!oppContainers.isEmpty()) { - handleNewContainers(oppContainers, false); - appAttempt.updateNMTokens(oppContainers); - addToAllocatedContainers(allocateResponse, oppContainers); - } - - // Allocate GUARANTEED containers. - request.setAskList(partitionedAsks.getGuaranteed()); - - super.allocateInternal(appAttemptId, request, allocateResponse); - } - - @Override public RegisterDistributedSchedulingAMResponse registerApplicationMasterForDistributedScheduling( RegisterApplicationMasterRequest request) throws YarnException, http://git-wip-us.apache.org/repos/asf/hadoop/blob/043b7d13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java index eaeb6a2..d459492 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java @@ -79,6 +79,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo + .FifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; import org.apache.hadoop.yarn.util.resource.Resources; @@ -652,6 +655,11 @@ public class TestOpportunisticContainerAllocatorAMService { public RMContainerTokenSecretManager getContainerTokenSecretManager() { return new RMContainerTokenSecretManager(conf); } + + @Override + public ResourceScheduler getScheduler() { + return new FifoScheduler(); + } }; Container c = factory.newRecordInstance(Container.class); c.setExecutionType(ExecutionType.OPPORTUNISTIC); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org