Repository: hadoop
Updated Branches:
  refs/heads/branch-2 2d62af654 -> 043b7d133


YARN-6776. Refactor ApplicaitonMasterService to move actual processing logic to 
a separate class. (asuresh)

(cherry picked from commit 5496a34c0cb2b1a83cfa6b0aba5a77b05ff2d8f0)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/043b7d13
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/043b7d13
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/043b7d13

Branch: refs/heads/branch-2
Commit: 043b7d133ef48d590c62b786c53ec8e6d0e2e4c5
Parents: 2d62af6
Author: Arun Suresh <asur...@apache.org>
Authored: Mon Jul 10 14:34:58 2017 -0700
Committer: Arun Suresh <asur...@apache.org>
Committed: Fri Aug 4 16:43:35 2017 -0700

----------------------------------------------------------------------
 .../ams/ApplicationMasterServiceProcessor.java  |  71 +++
 .../yarn/ams/ApplicationMasterServiceUtils.java |  89 ++++
 .../apache/hadoop/yarn/ams/package-info.java    |  24 +
 .../ApplicationMasterService.java               | 424 +-----------------
 .../resourcemanager/DefaultAMSProcessor.java    | 447 +++++++++++++++++++
 ...pportunisticContainerAllocatorAMService.java | 162 +++----
 ...pportunisticContainerAllocatorAMService.java |   8 +
 7 files changed, 746 insertions(+), 479 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/043b7d13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceProcessor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceProcessor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceProcessor.java
new file mode 100644
index 0000000..b426f48
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceProcessor.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.ams;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords
+    .FinishApplicationMasterRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords
+    .RegisterApplicationMasterRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import java.io.IOException;
+
+/**
+ * Interface to abstract out the the actual processing logic of the
+ * Application Master Service.
+ */
+public interface ApplicationMasterServiceProcessor {
+
+  /**
+   * Register AM attempt.
+   * @param applicationAttemptId applicationAttemptId.
+   * @param request Register Request.
+   * @return Register Response.
+   * @throws IOException IOException.
+   */
+  RegisterApplicationMasterResponse registerApplicationMaster(
+      ApplicationAttemptId applicationAttemptId,
+      RegisterApplicationMasterRequest request) throws IOException;
+
+  /**
+   * Allocate call.
+   * @param appAttemptId appAttemptId.
+   * @param request Allocate Request.
+   * @return Allocate Response.
+   * @throws YarnException YarnException.
+   */
+  AllocateResponse allocate(ApplicationAttemptId appAttemptId,
+      AllocateRequest request) throws YarnException;
+
+  /**
+   * Finish AM.
+   * @param applicationAttemptId applicationAttemptId.
+   * @param request Finish AM Request.
+   * @return Finish AM response.
+   */
+  FinishApplicationMasterResponse finishApplicationMaster(
+      ApplicationAttemptId applicationAttemptId,
+      FinishApplicationMasterRequest request);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/043b7d13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceUtils.java
new file mode 100644
index 0000000..476da8b
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceUtils.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.ams;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
+import org.apache.hadoop.yarn.api.records.UpdateContainerError;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utility methods to be used by {@link ApplicationMasterServiceProcessor}.
+ */
+public final class ApplicationMasterServiceUtils {
+
+  private ApplicationMasterServiceUtils() { }
+
+  /**
+   * Add update container errors to {@link AllocateResponse}.
+   * @param allocateResponse Allocate Response.
+   * @param updateContainerErrors Errors.
+   */
+  public static void addToUpdateContainerErrors(
+      AllocateResponse allocateResponse,
+      List<UpdateContainerError> updateContainerErrors) {
+    if (!updateContainerErrors.isEmpty()) {
+      if (allocateResponse.getUpdateErrors() != null
+          && !allocateResponse.getUpdateErrors().isEmpty()) {
+        updateContainerErrors.addAll(allocateResponse.getUpdateErrors());
+      }
+      allocateResponse.setUpdateErrors(updateContainerErrors);
+    }
+  }
+
+  /**
+   * Add updated containers to {@link AllocateResponse}.
+   * @param allocateResponse Allocate Response.
+   * @param updateType Update Type.
+   * @param updatedContainers Updated Containers.
+   */
+  public static void addToUpdatedContainers(AllocateResponse allocateResponse,
+      ContainerUpdateType updateType, List<Container> updatedContainers) {
+    if (updatedContainers != null && updatedContainers.size() > 0) {
+      ArrayList<UpdatedContainer> containersToSet = new ArrayList<>();
+      if (allocateResponse.getUpdatedContainers() != null &&
+          !allocateResponse.getUpdatedContainers().isEmpty()) {
+        containersToSet.addAll(allocateResponse.getUpdatedContainers());
+      }
+      for (Container updatedContainer : updatedContainers) {
+        containersToSet.add(
+            UpdatedContainer.newInstance(updateType, updatedContainer));
+      }
+      allocateResponse.setUpdatedContainers(containersToSet);
+    }
+  }
+
+  /**
+   * Add allocated containers to {@link AllocateResponse}.
+   * @param allocateResponse Allocate Response.
+   * @param allocatedContainers Allocated Containers.
+   */
+  public static void addToAllocatedContainers(AllocateResponse 
allocateResponse,
+      List<Container> allocatedContainers) {
+    if (allocateResponse.getAllocatedContainers() != null
+        && !allocateResponse.getAllocatedContainers().isEmpty()) {
+      allocatedContainers.addAll(allocateResponse.getAllocatedContainers());
+    }
+    allocateResponse.setAllocatedContainers(allocatedContainers);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/043b7d13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/package-info.java
new file mode 100644
index 0000000..b23534e
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Public api for Application Master Service interceptors.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.ams;
+import org.apache.hadoop.classification.InterfaceAudience;
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/043b7d13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 9d18ced..5ed0a80 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -21,12 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -37,10 +33,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.security.SaslRpcServer;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -52,30 +48,11 @@ import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
-import org.apache.hadoop.yarn.api.records.NMToken;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.PreemptionContainer;
-import org.apache.hadoop.yarn.api.records.PreemptionContract;
-import org.apache.hadoop.yarn.api.records.PreemptionMessage;
-import org.apache.hadoop.yarn.api.records.PreemptionResourceRequest;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
-import org.apache.hadoop.yarn.api.records.UpdateContainerError;
-import org.apache.hadoop.yarn.api.records.UpdatedContainer;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
 import 
org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import 
org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
-import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
-import 
org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
-import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -86,25 +63,12 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
-import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
-import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
-import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
-
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler
-    .AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler
-    .SchedulerApplicationAttempt;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.security
     .AMRMTokenSecretManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
 import org.apache.hadoop.yarn.server.security.MasterKeyData;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -124,6 +88,12 @@ public class ApplicationMasterService extends 
AbstractService implements
   private final ConcurrentMap<ApplicationAttemptId, AllocateResponseLock> 
responseMap =
       new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
   protected final RMContext rmContext;
+  private final ApplicationMasterServiceProcessor amsProcessor;
+
+  public ApplicationMasterService(RMContext rmContext,
+      YarnScheduler scheduler) {
+    this(ApplicationMasterService.class.getName(), rmContext, scheduler);
+  }
 
   public ApplicationMasterService(String name, RMContext rmContext,
       YarnScheduler scheduler) {
@@ -131,11 +101,11 @@ public class ApplicationMasterService extends 
AbstractService implements
     this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
     this.rScheduler = scheduler;
     this.rmContext = rmContext;
+    this.amsProcessor = createProcessor();
   }
 
-  public ApplicationMasterService(RMContext rmContext,
-      YarnScheduler scheduler) {
-    this(ApplicationMasterService.class.getName(), rmContext, scheduler);
+  protected ApplicationMasterServiceProcessor createProcessor() {
+    return new DefaultAMSProcessor(rmContext, rScheduler);
   }
 
   @Override
@@ -230,82 +200,22 @@ public class ApplicationMasterService extends 
AbstractService implements
                 + appID;
         LOG.warn(message);
         RMAuditLogger.logFailure(
-          this.rmContext.getRMApps()
-            .get(appID).getUser(),
-          AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message,
-          appID, applicationAttemptId);
+            this.rmContext.getRMApps()
+                .get(appID).getUser(),
+            AuditConstants.REGISTER_AM, "", "ApplicationMasterService", 
message,
+            appID, applicationAttemptId);
         throw new InvalidApplicationMasterRequestException(message);
       }
-      
+
       this.amLivelinessMonitor.receivedPing(applicationAttemptId);
-      RMApp app = this.rmContext.getRMApps().get(appID);
-      
+
       // Setting the response id to 0 to identify if the
       // application master is register for the respective attemptid
       lastResponse.setResponseId(0);
       lock.setAllocateResponse(lastResponse);
-      LOG.info("AM registration " + applicationAttemptId);
-      this.rmContext
-        .getDispatcher()
-        .getEventHandler()
-        .handle(
-          new RMAppAttemptRegistrationEvent(applicationAttemptId, request
-            .getHost(), request.getRpcPort(), request.getTrackingUrl()));
-      RMAuditLogger.logSuccess(app.getUser(), AuditConstants.REGISTER_AM,
-        "ApplicationMasterService", appID, applicationAttemptId);
-
-      // Pick up min/max resource from scheduler...
-      RegisterApplicationMasterResponse response = recordFactory
-          .newRecordInstance(RegisterApplicationMasterResponse.class);
-      response.setMaximumResourceCapability(rScheduler
-          .getMaximumResourceCapability(app.getQueue()));
-      response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId)
-          .getSubmissionContext().getAMContainerSpec().getApplicationACLs());
-      response.setQueue(app.getQueue());
-      if (UserGroupInformation.isSecurityEnabled()) {
-        LOG.info("Setting client token master key");
-        response.setClientToAMTokenMasterKey(java.nio.ByteBuffer.wrap(rmContext
-            .getClientToAMTokenSecretManager()
-            .getMasterKey(applicationAttemptId).getEncoded()));        
-      }
-
-      // For work-preserving AM restart, retrieve previous attempts' containers
-      // and corresponding NM tokens.
-      if (app.getApplicationSubmissionContext()
-          .getKeepContainersAcrossApplicationAttempts()) {
-        List<Container> transferredContainers = rScheduler
-            .getTransferredContainers(applicationAttemptId);
-        if (!transferredContainers.isEmpty()) {
-          response.setContainersFromPreviousAttempts(transferredContainers);
-          List<NMToken> nmTokens = new ArrayList<NMToken>();
-          for (Container container : transferredContainers) {
-            try {
-              NMToken token = rmContext.getNMTokenSecretManager()
-                  .createAndGetNMToken(app.getUser(), applicationAttemptId,
-                      container);
-              if (null != token) {
-                nmTokens.add(token);
-              }
-            } catch (IllegalArgumentException e) {
-              // if it's a DNS issue, throw UnknowHostException directly and
-              // that
-              // will be automatically retried by RMProxy in RPC layer.
-              if (e.getCause() instanceof UnknownHostException) {
-                throw (UnknownHostException) e.getCause();
-              }
-            }
-          }
-          response.setNMTokensFromPreviousAttempts(nmTokens);
-          LOG.info("Application " + appID + " retrieved "
-              + transferredContainers.size() + " containers from previous"
-              + " attempts and " + nmTokens.size() + " NM tokens.");
-        }
-      }
 
-      response.setSchedulerResourceTypes(rScheduler
-        .getSchedulingResourceTypes());
-
-      return response;
+      return this.amsProcessor.registerApplicationMaster(
+          amrmTokenIdentifier.getApplicationAttemptId(), request);
     }
   }
 
@@ -350,15 +260,8 @@ public class ApplicationMasterService extends 
AbstractService implements
       }
 
       this.amLivelinessMonitor.receivedPing(applicationAttemptId);
-
-      rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptUnregistrationEvent(applicationAttemptId, request
-              .getTrackingUrl(), request.getFinalApplicationStatus(), request
-              .getDiagnostics()));
-
-      // For UnmanagedAMs, return true so they don't retry
-      return FinishApplicationMasterResponse.newInstance(
-          rmApp.getApplicationSubmissionContext().getUnmanagedAM());
+      return this.amsProcessor.finishApplicationMaster(
+          applicationAttemptId, request);
     }
   }
 
@@ -438,10 +341,8 @@ public class ApplicationMasterService extends 
AbstractService implements
         throw new InvalidApplicationMasterRequestException(message);
       }
 
-      AllocateResponse response =
-          recordFactory.newRecordInstance(AllocateResponse.class);
-      allocateInternal(amrmTokenIdentifier.getApplicationAttemptId(),
-          request, response);
+      AllocateResponse response = this.amsProcessor.allocate(
+          amrmTokenIdentifier.getApplicationAttemptId(), request);
 
       // update AMRMToken if the token is rolled-up
       MasterKeyData nextMasterKey =
@@ -477,288 +378,7 @@ public class ApplicationMasterService extends 
AbstractService implements
       response.setResponseId(lastResponse.getResponseId() + 1);
       lock.setAllocateResponse(response);
       return response;
-    }    
-  }
-
-  protected void allocateInternal(ApplicationAttemptId appAttemptId,
-      AllocateRequest request, AllocateResponse allocateResponse)
-      throws YarnException {
-
-    //filter illegal progress values
-    float filteredProgress = request.getProgress();
-    if (Float.isNaN(filteredProgress) ||
-        filteredProgress == Float.NEGATIVE_INFINITY ||
-        filteredProgress < 0) {
-      request.setProgress(0);
-    } else if (filteredProgress > 1 ||
-        filteredProgress == Float.POSITIVE_INFINITY) {
-      request.setProgress(1);
-    }
-
-    // Send the status update to the appAttempt.
-    this.rmContext.getDispatcher().getEventHandler().handle(
-        new RMAppAttemptStatusupdateEvent(appAttemptId, request
-            .getProgress()));
-
-    List<ResourceRequest> ask = request.getAskList();
-    List<ContainerId> release = request.getReleaseList();
-
-    ResourceBlacklistRequest blacklistRequest =
-        request.getResourceBlacklistRequest();
-    List<String> blacklistAdditions =
-        (blacklistRequest != null) ?
-            blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST;
-    List<String> blacklistRemovals =
-        (blacklistRequest != null) ?
-            blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST;
-    RMApp app =
-        this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
-
-    // set label expression for Resource Requests if resourceName=ANY
-    ApplicationSubmissionContext asc = app.getApplicationSubmissionContext();
-    for (ResourceRequest req : ask) {
-      if (null == req.getNodeLabelExpression()
-          && ResourceRequest.ANY.equals(req.getResourceName())) {
-        req.setNodeLabelExpression(asc.getNodeLabelExpression());
-      }
-    }
-
-    Resource maximumCapacity = rScheduler.getMaximumResourceCapability();
-
-    // sanity check
-    try {
-      RMServerUtils.normalizeAndValidateRequests(ask,
-          maximumCapacity, app.getQueue(),
-          rScheduler, rmContext);
-    } catch (InvalidResourceRequestException e) {
-      LOG.warn("Invalid resource ask by application " + appAttemptId, e);
-      throw e;
-    }
-
-    try {
-      RMServerUtils.validateBlacklistRequest(blacklistRequest);
-    }  catch (InvalidResourceBlacklistRequestException e) {
-      LOG.warn("Invalid blacklist request by application " + appAttemptId, e);
-      throw e;
     }
-
-    // In the case of work-preserving AM restart, it's possible for the
-    // AM to release containers from the earlier attempt.
-    if (!app.getApplicationSubmissionContext()
-        .getKeepContainersAcrossApplicationAttempts()) {
-      try {
-        RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
-      } catch (InvalidContainerReleaseException e) {
-        LOG.warn("Invalid container release by application " + appAttemptId,
-            e);
-        throw e;
-      }
-    }
-
-    // Split Update Resource Requests into increase and decrease.
-    // No Exceptions are thrown here. All update errors are aggregated
-    // and returned to the AM.
-    List<UpdateContainerError> updateErrors = new ArrayList<>();
-    ContainerUpdates containerUpdateRequests =
-        RMServerUtils.validateAndSplitUpdateResourceRequests(
-        rmContext, request, maximumCapacity, updateErrors);
-
-    // Send new requests to appAttempt.
-    Allocation allocation;
-    RMAppAttemptState state =
-        app.getRMAppAttempt(appAttemptId).getAppAttemptState();
-    if (state.equals(RMAppAttemptState.FINAL_SAVING) ||
-        state.equals(RMAppAttemptState.FINISHING) ||
-        app.isAppFinalStateStored()) {
-      LOG.warn(appAttemptId + " is in " + state +
-               " state, ignore container allocate request.");
-      allocation = EMPTY_ALLOCATION;
-    } else {
-      allocation =
-          this.rScheduler.allocate(appAttemptId, ask, release,
-              blacklistAdditions, blacklistRemovals,
-              containerUpdateRequests);
-    }
-
-    if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
-      LOG.info("blacklist are updated in Scheduler." +
-          "blacklistAdditions: " + blacklistAdditions + ", " +
-          "blacklistRemovals: " + blacklistRemovals);
-    }
-    RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
-
-    if (allocation.getNMTokens() != null &&
-        !allocation.getNMTokens().isEmpty()) {
-      allocateResponse.setNMTokens(allocation.getNMTokens());
-    }
-
-    // Notify the AM of container update errors
-    addToUpdateContainerErrors(allocateResponse, updateErrors);
-
-    // update the response with the deltas of node status changes
-    List<RMNode> updatedNodes = new ArrayList<RMNode>();
-    if(app.pullRMNodeUpdates(updatedNodes) > 0) {
-      List<NodeReport> updatedNodeReports = new ArrayList<NodeReport>();
-      for(RMNode rmNode: updatedNodes) {
-        SchedulerNodeReport schedulerNodeReport =
-            rScheduler.getNodeReport(rmNode.getNodeID());
-        Resource used = BuilderUtils.newResource(0, 0);
-        int numContainers = 0;
-        if (schedulerNodeReport != null) {
-          used = schedulerNodeReport.getUsedResource();
-          numContainers = schedulerNodeReport.getNumContainers();
-        }
-        NodeId nodeId = rmNode.getNodeID();
-        NodeReport report =
-            BuilderUtils.newNodeReport(nodeId, rmNode.getState(),
-                rmNode.getHttpAddress(), rmNode.getRackName(), used,
-                rmNode.getTotalCapability(), numContainers,
-                rmNode.getHealthReport(), rmNode.getLastHealthReportTime(),
-                rmNode.getNodeLabels());
-
-        updatedNodeReports.add(report);
-      }
-      allocateResponse.setUpdatedNodes(updatedNodeReports);
-    }
-
-    addToAllocatedContainers(allocateResponse, allocation.getContainers());
-
-    allocateResponse.setCompletedContainersStatuses(appAttempt
-        .pullJustFinishedContainers());
-    allocateResponse.setAvailableResources(allocation.getResourceLimit());
-
-    addToContainerUpdates(appAttemptId, allocateResponse, allocation);
-
-    allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
-
-    // add preemption to the allocateResponse message (if any)
-    allocateResponse
-        .setPreemptionMessage(generatePreemptionMessage(allocation));
-
-    // Set application priority
-    allocateResponse.setApplicationPriority(app
-        .getApplicationPriority());
-  }
-
-  private void addToContainerUpdates(ApplicationAttemptId appAttemptId,
-      AllocateResponse allocateResponse, Allocation allocation) {
-    // Handling increased containers
-    addToUpdatedContainers(
-        allocateResponse, ContainerUpdateType.INCREASE_RESOURCE,
-        allocation.getIncreasedContainers());
-
-    // Handling decreased containers
-    addToUpdatedContainers(
-        allocateResponse, ContainerUpdateType.DECREASE_RESOURCE,
-        allocation.getDecreasedContainers());
-
-    // Handling promoted containers
-    addToUpdatedContainers(
-        allocateResponse, ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
-        allocation.getPromotedContainers());
-
-    // Handling demoted containers
-    addToUpdatedContainers(
-        allocateResponse, ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
-        allocation.getDemotedContainers());
-
-    SchedulerApplicationAttempt applicationAttempt = ((AbstractYarnScheduler)
-        rScheduler).getApplicationAttempt(appAttemptId);
-    if (applicationAttempt != null) {
-      addToUpdateContainerErrors(allocateResponse,
-          ((AbstractYarnScheduler)rScheduler)
-              
.getApplicationAttempt(appAttemptId).pullUpdateContainerErrors());
-    }
-  }
-
-  protected void addToUpdateContainerErrors(AllocateResponse allocateResponse,
-      List<UpdateContainerError> updateContainerErrors) {
-    if (!updateContainerErrors.isEmpty()) {
-      if (allocateResponse.getUpdateErrors() != null
-          && !allocateResponse.getUpdateErrors().isEmpty()) {
-        updateContainerErrors = new ArrayList<>(updateContainerErrors);
-        updateContainerErrors.addAll(allocateResponse.getUpdateErrors());
-      }
-      allocateResponse.setUpdateErrors(updateContainerErrors);
-    }
-  }
-
-  protected void addToUpdatedContainers(AllocateResponse allocateResponse,
-      ContainerUpdateType updateType, List<Container> updatedContainers) {
-    if (updatedContainers != null && updatedContainers.size() > 0) {
-      ArrayList<UpdatedContainer> containersToSet = new ArrayList<>();
-      if (allocateResponse.getUpdatedContainers() != null &&
-          !allocateResponse.getUpdatedContainers().isEmpty()) {
-        containersToSet.addAll(allocateResponse.getUpdatedContainers());
-      }
-      for (Container updatedContainer : updatedContainers) {
-        containersToSet.add(
-            UpdatedContainer.newInstance(updateType, updatedContainer));
-      }
-      allocateResponse.setUpdatedContainers(containersToSet);
-    }
-  }
-
-  protected void addToAllocatedContainers(AllocateResponse allocateResponse,
-      List<Container> allocatedContainers) {
-    if (allocateResponse.getAllocatedContainers() != null
-        && !allocateResponse.getAllocatedContainers().isEmpty()) {
-      allocatedContainers = new ArrayList<>(allocatedContainers);
-      allocatedContainers.addAll(allocateResponse.getAllocatedContainers());
-    }
-    allocateResponse.setAllocatedContainers(allocatedContainers);
-  }
-
-  private PreemptionMessage generatePreemptionMessage(Allocation allocation){
-    PreemptionMessage pMsg = null;
-    // assemble strict preemption request
-    if (allocation.getStrictContainerPreemptions() != null) {
-       pMsg =
-        recordFactory.newRecordInstance(PreemptionMessage.class);
-      StrictPreemptionContract pStrict =
-          recordFactory.newRecordInstance(StrictPreemptionContract.class);
-      Set<PreemptionContainer> pCont = new HashSet<PreemptionContainer>();
-      for (ContainerId cId : allocation.getStrictContainerPreemptions()) {
-        PreemptionContainer pc =
-            recordFactory.newRecordInstance(PreemptionContainer.class);
-        pc.setId(cId);
-        pCont.add(pc);
-      }
-      pStrict.setContainers(pCont);
-      pMsg.setStrictContract(pStrict);
-    }
-
-    // assemble negotiable preemption request
-    if (allocation.getResourcePreemptions() != null &&
-        allocation.getResourcePreemptions().size() > 0 &&
-        allocation.getContainerPreemptions() != null &&
-        allocation.getContainerPreemptions().size() > 0) {
-      if (pMsg == null) {
-        pMsg =
-            recordFactory.newRecordInstance(PreemptionMessage.class);
-      }
-      PreemptionContract contract =
-          recordFactory.newRecordInstance(PreemptionContract.class);
-      Set<PreemptionContainer> pCont = new HashSet<PreemptionContainer>();
-      for (ContainerId cId : allocation.getContainerPreemptions()) {
-        PreemptionContainer pc =
-            recordFactory.newRecordInstance(PreemptionContainer.class);
-        pc.setId(cId);
-        pCont.add(pc);
-      }
-      List<PreemptionResourceRequest> pRes = new 
ArrayList<PreemptionResourceRequest>();
-      for (ResourceRequest crr : allocation.getResourcePreemptions()) {
-        PreemptionResourceRequest prr =
-            recordFactory.newRecordInstance(PreemptionResourceRequest.class);
-        prr.setResourceRequest(crr);
-        pRes.add(prr);
-      }
-      contract.setContainers(pCont);
-      contract.setResourceRequest(pRes);
-      pMsg.setContract(contract);
-    }
-    
-    return pMsg;
   }
 
   public void registerAppAttempt(ApplicationAttemptId attemptId) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/043b7d13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
new file mode 100644
index 0000000..ecb66c3
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
@@ -0,0 +1,447 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.PreemptionContainer;
+import org.apache.hadoop.yarn.api.records.PreemptionContract;
+import org.apache.hadoop.yarn.api.records.PreemptionMessage;
+import org.apache.hadoop.yarn.api.records.PreemptionResourceRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
+import org.apache.hadoop.yarn.api.records.UpdateContainerError;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
+import 
org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
+    .RMAppAttemptState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event
+    .RMAppAttemptStatusupdateEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event
+    .RMAppAttemptUnregistrationEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .SchedulerNodeReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
+
+  private static final Log LOG = LogFactory.getLog(DefaultAMSProcessor.class);
+
+  private final static List<Container> EMPTY_CONTAINER_LIST =
+      new ArrayList<Container>();
+  protected static final Allocation EMPTY_ALLOCATION = new Allocation(
+      EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
+
+  private final RecordFactory recordFactory =
+      RecordFactoryProvider.getRecordFactory(null);
+
+  private final RMContext rmContext;
+  private final YarnScheduler scheduler;
+
+  DefaultAMSProcessor(RMContext rmContext, YarnScheduler scheduler) {
+    this.rmContext = rmContext;
+    this.scheduler = scheduler;
+  }
+
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      ApplicationAttemptId applicationAttemptId,
+      RegisterApplicationMasterRequest request) throws IOException {
+
+    RMApp app = getRmContext().getRMApps().get(
+        applicationAttemptId.getApplicationId());
+    LOG.info("AM registration " + applicationAttemptId);
+    getRmContext().getDispatcher().getEventHandler()
+        .handle(
+            new RMAppAttemptRegistrationEvent(applicationAttemptId, request
+                .getHost(), request.getRpcPort(), request.getTrackingUrl()));
+    RMAuditLogger.logSuccess(app.getUser(),
+        RMAuditLogger.AuditConstants.REGISTER_AM,
+        "ApplicationMasterService", app.getApplicationId(),
+        applicationAttemptId);
+    RegisterApplicationMasterResponse response = recordFactory
+        .newRecordInstance(RegisterApplicationMasterResponse.class);
+    response.setMaximumResourceCapability(getScheduler()
+        .getMaximumResourceCapability(app.getQueue()));
+    response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId)
+        .getSubmissionContext().getAMContainerSpec().getApplicationACLs());
+    response.setQueue(app.getQueue());
+    if (UserGroupInformation.isSecurityEnabled()) {
+      LOG.info("Setting client token master key");
+      response.setClientToAMTokenMasterKey(java.nio.ByteBuffer.wrap(
+          getRmContext().getClientToAMTokenSecretManager()
+          .getMasterKey(applicationAttemptId).getEncoded()));
+    }
+
+    // For work-preserving AM restart, retrieve previous attempts' containers
+    // and corresponding NM tokens.
+    if (app.getApplicationSubmissionContext()
+        .getKeepContainersAcrossApplicationAttempts()) {
+      List<Container> transferredContainers = getScheduler()
+          .getTransferredContainers(applicationAttemptId);
+      if (!transferredContainers.isEmpty()) {
+        response.setContainersFromPreviousAttempts(transferredContainers);
+        List<NMToken> nmTokens = new ArrayList<NMToken>();
+        for (Container container : transferredContainers) {
+          try {
+            NMToken token = getRmContext().getNMTokenSecretManager()
+                .createAndGetNMToken(app.getUser(), applicationAttemptId,
+                    container);
+            if (null != token) {
+              nmTokens.add(token);
+            }
+          } catch (IllegalArgumentException e) {
+            // if it's a DNS issue, throw UnknowHostException directly and
+            // that
+            // will be automatically retried by RMProxy in RPC layer.
+            if (e.getCause() instanceof UnknownHostException) {
+              throw (UnknownHostException) e.getCause();
+            }
+          }
+        }
+        response.setNMTokensFromPreviousAttempts(nmTokens);
+        LOG.info("Application " + app.getApplicationId() + " retrieved "
+            + transferredContainers.size() + " containers from previous"
+            + " attempts and " + nmTokens.size() + " NM tokens.");
+      }
+    }
+
+    response.setSchedulerResourceTypes(getScheduler()
+        .getSchedulingResourceTypes());
+    return response;
+  }
+
+  public AllocateResponse allocate(ApplicationAttemptId appAttemptId,
+      AllocateRequest request) throws YarnException {
+
+    handleProgress(appAttemptId, request);
+
+    List<ResourceRequest> ask = request.getAskList();
+    List<ContainerId> release = request.getReleaseList();
+
+    ResourceBlacklistRequest blacklistRequest =
+        request.getResourceBlacklistRequest();
+    List<String> blacklistAdditions =
+        (blacklistRequest != null) ?
+            blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST;
+    List<String> blacklistRemovals =
+        (blacklistRequest != null) ?
+            blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST;
+    RMApp app =
+        getRmContext().getRMApps().get(appAttemptId.getApplicationId());
+
+    // set label expression for Resource Requests if resourceName=ANY
+    ApplicationSubmissionContext asc = app.getApplicationSubmissionContext();
+    for (ResourceRequest req : ask) {
+      if (null == req.getNodeLabelExpression()
+          && ResourceRequest.ANY.equals(req.getResourceName())) {
+        req.setNodeLabelExpression(asc.getNodeLabelExpression());
+      }
+    }
+
+    Resource maximumCapacity = getScheduler().getMaximumResourceCapability();
+
+    // sanity check
+    try {
+      RMServerUtils.normalizeAndValidateRequests(ask,
+          maximumCapacity, app.getQueue(),
+          getScheduler(), getRmContext());
+    } catch (InvalidResourceRequestException e) {
+      LOG.warn("Invalid resource ask by application " + appAttemptId, e);
+      throw e;
+    }
+
+    try {
+      RMServerUtils.validateBlacklistRequest(blacklistRequest);
+    }  catch (InvalidResourceBlacklistRequestException e) {
+      LOG.warn("Invalid blacklist request by application " + appAttemptId, e);
+      throw e;
+    }
+
+    // In the case of work-preserving AM restart, it's possible for the
+    // AM to release containers from the earlier attempt.
+    if (!app.getApplicationSubmissionContext()
+        .getKeepContainersAcrossApplicationAttempts()) {
+      try {
+        RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
+      } catch (InvalidContainerReleaseException e) {
+        LOG.warn("Invalid container release by application " + appAttemptId,
+            e);
+        throw e;
+      }
+    }
+
+    // Split Update Resource Requests into increase and decrease.
+    // No Exceptions are thrown here. All update errors are aggregated
+    // and returned to the AM.
+    List<UpdateContainerError> updateErrors = new ArrayList<>();
+    ContainerUpdates containerUpdateRequests =
+        RMServerUtils.validateAndSplitUpdateResourceRequests(
+            getRmContext(), request, maximumCapacity, updateErrors);
+
+    // Send new requests to appAttempt.
+    Allocation allocation;
+    RMAppAttemptState state =
+        app.getRMAppAttempt(appAttemptId).getAppAttemptState();
+    if (state.equals(RMAppAttemptState.FINAL_SAVING) ||
+        state.equals(RMAppAttemptState.FINISHING) ||
+        app.isAppFinalStateStored()) {
+      LOG.warn(appAttemptId + " is in " + state +
+          " state, ignore container allocate request.");
+      allocation = EMPTY_ALLOCATION;
+    } else {
+      allocation =
+          getScheduler().allocate(appAttemptId, ask, release,
+              blacklistAdditions, blacklistRemovals,
+              containerUpdateRequests);
+    }
+
+    if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
+      LOG.info("blacklist are updated in Scheduler." +
+          "blacklistAdditions: " + blacklistAdditions + ", " +
+          "blacklistRemovals: " + blacklistRemovals);
+    }
+    RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
+    AllocateResponse allocateResponse =
+        recordFactory.newRecordInstance(AllocateResponse.class);
+
+    if (allocation.getNMTokens() != null &&
+        !allocation.getNMTokens().isEmpty()) {
+      allocateResponse.setNMTokens(allocation.getNMTokens());
+    }
+
+    // Notify the AM of container update errors
+    ApplicationMasterServiceUtils.addToUpdateContainerErrors(
+        allocateResponse, updateErrors);
+
+    // update the response with the deltas of node status changes
+    handleNodeUpdates(app, allocateResponse);
+
+    ApplicationMasterServiceUtils.addToAllocatedContainers(
+        allocateResponse, allocation.getContainers());
+
+    allocateResponse.setCompletedContainersStatuses(appAttempt
+        .pullJustFinishedContainers());
+    allocateResponse.setAvailableResources(allocation.getResourceLimit());
+
+    addToContainerUpdates(allocateResponse, allocation,
+        ((AbstractYarnScheduler)getScheduler())
+            .getApplicationAttempt(appAttemptId).pullUpdateContainerErrors());
+
+    allocateResponse.setNumClusterNodes(getScheduler().getNumClusterNodes());
+
+    // add preemption to the allocateResponse message (if any)
+    allocateResponse
+        .setPreemptionMessage(generatePreemptionMessage(allocation));
+
+    // Set application priority
+    allocateResponse.setApplicationPriority(app
+        .getApplicationPriority());
+    return allocateResponse;
+  }
+
+  private void handleNodeUpdates(RMApp app, AllocateResponse allocateResponse) 
{
+    List<RMNode> updatedNodes = new ArrayList<>();
+    if(app.pullRMNodeUpdates(updatedNodes) > 0) {
+      List<NodeReport> updatedNodeReports = new ArrayList<>();
+      for(RMNode rmNode: updatedNodes) {
+        SchedulerNodeReport schedulerNodeReport =
+            getScheduler().getNodeReport(rmNode.getNodeID());
+        Resource used = BuilderUtils.newResource(0, 0);
+        int numContainers = 0;
+        if (schedulerNodeReport != null) {
+          used = schedulerNodeReport.getUsedResource();
+          numContainers = schedulerNodeReport.getNumContainers();
+        }
+        NodeId nodeId = rmNode.getNodeID();
+        NodeReport report =
+            BuilderUtils.newNodeReport(nodeId, rmNode.getState(),
+                rmNode.getHttpAddress(), rmNode.getRackName(), used,
+                rmNode.getTotalCapability(), numContainers,
+                rmNode.getHealthReport(), rmNode.getLastHealthReportTime(),
+                rmNode.getNodeLabels());
+
+        updatedNodeReports.add(report);
+      }
+      allocateResponse.setUpdatedNodes(updatedNodeReports);
+    }
+  }
+
+  private void handleProgress(ApplicationAttemptId appAttemptId,
+      AllocateRequest request) {
+    //filter illegal progress values
+    float filteredProgress = request.getProgress();
+    if (Float.isNaN(filteredProgress) ||
+        filteredProgress == Float.NEGATIVE_INFINITY ||
+        filteredProgress < 0) {
+      request.setProgress(0);
+    } else if (filteredProgress > 1 ||
+        filteredProgress == Float.POSITIVE_INFINITY) {
+      request.setProgress(1);
+    }
+
+    // Send the status update to the appAttempt.
+    getRmContext().getDispatcher().getEventHandler().handle(
+        new RMAppAttemptStatusupdateEvent(appAttemptId, request
+            .getProgress()));
+  }
+
+  public FinishApplicationMasterResponse finishApplicationMaster(
+      ApplicationAttemptId applicationAttemptId,
+      FinishApplicationMasterRequest request) {
+    RMApp app =
+        
getRmContext().getRMApps().get(applicationAttemptId.getApplicationId());
+    // For UnmanagedAMs, return true so they don't retry
+    FinishApplicationMasterResponse response =
+        FinishApplicationMasterResponse.newInstance(
+            app.getApplicationSubmissionContext().getUnmanagedAM());
+    getRmContext().getDispatcher().getEventHandler().handle(
+        new RMAppAttemptUnregistrationEvent(applicationAttemptId, request
+            .getTrackingUrl(), request.getFinalApplicationStatus(), request
+            .getDiagnostics()));
+    return response;
+  }
+
+  private PreemptionMessage generatePreemptionMessage(Allocation allocation){
+    PreemptionMessage pMsg = null;
+    // assemble strict preemption request
+    if (allocation.getStrictContainerPreemptions() != null) {
+      pMsg =
+          recordFactory.newRecordInstance(PreemptionMessage.class);
+      StrictPreemptionContract pStrict =
+          recordFactory.newRecordInstance(StrictPreemptionContract.class);
+      Set<PreemptionContainer> pCont = new HashSet<>();
+      for (ContainerId cId : allocation.getStrictContainerPreemptions()) {
+        PreemptionContainer pc =
+            recordFactory.newRecordInstance(PreemptionContainer.class);
+        pc.setId(cId);
+        pCont.add(pc);
+      }
+      pStrict.setContainers(pCont);
+      pMsg.setStrictContract(pStrict);
+    }
+
+    // assemble negotiable preemption request
+    if (allocation.getResourcePreemptions() != null &&
+        allocation.getResourcePreemptions().size() > 0 &&
+        allocation.getContainerPreemptions() != null &&
+        allocation.getContainerPreemptions().size() > 0) {
+      if (pMsg == null) {
+        pMsg =
+            recordFactory.newRecordInstance(PreemptionMessage.class);
+      }
+      PreemptionContract contract =
+          recordFactory.newRecordInstance(PreemptionContract.class);
+      Set<PreemptionContainer> pCont = new HashSet<>();
+      for (ContainerId cId : allocation.getContainerPreemptions()) {
+        PreemptionContainer pc =
+            recordFactory.newRecordInstance(PreemptionContainer.class);
+        pc.setId(cId);
+        pCont.add(pc);
+      }
+      List<PreemptionResourceRequest> pRes = new ArrayList<>();
+      for (ResourceRequest crr : allocation.getResourcePreemptions()) {
+        PreemptionResourceRequest prr =
+            recordFactory.newRecordInstance(PreemptionResourceRequest.class);
+        prr.setResourceRequest(crr);
+        pRes.add(prr);
+      }
+      contract.setContainers(pCont);
+      contract.setResourceRequest(pRes);
+      pMsg.setContract(contract);
+    }
+
+    return pMsg;
+  }
+
+  protected RMContext getRmContext() {
+    return rmContext;
+  }
+
+  protected YarnScheduler getScheduler() {
+    return scheduler;
+  }
+
+  private static void addToContainerUpdates(AllocateResponse allocateResponse,
+      Allocation allocation, List<UpdateContainerError> updateContainerErrors) 
{
+    // Handling increased containers
+    ApplicationMasterServiceUtils.addToUpdatedContainers(
+        allocateResponse, ContainerUpdateType.INCREASE_RESOURCE,
+        allocation.getIncreasedContainers());
+
+    // Handling decreased containers
+    ApplicationMasterServiceUtils.addToUpdatedContainers(
+        allocateResponse, ContainerUpdateType.DECREASE_RESOURCE,
+        allocation.getDecreasedContainers());
+
+    // Handling promoted containers
+    ApplicationMasterServiceUtils.addToUpdatedContainers(
+        allocateResponse, ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
+        allocation.getPromotedContainers());
+
+    // Handling demoted containers
+    ApplicationMasterServiceUtils.addToUpdatedContainers(
+        allocateResponse, ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
+        allocation.getDemotedContainers());
+
+    ApplicationMasterServiceUtils.addToUpdateContainerErrors(
+        allocateResponse, updateContainerErrors);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/043b7d13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
index c8ee0c6..75eb143 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
@@ -23,6 +23,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -37,8 +39,6 @@ import 
org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
-import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 
@@ -69,7 +69,6 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretMan
 
 import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
 import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
-import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
 
 import java.io.IOException;
@@ -102,6 +101,84 @@ public class OpportunisticContainerAllocatorAMService
   private volatile List<RemoteNode> cachedNodes;
   private volatile long lastCacheUpdateTime;
 
+  class OpportunisticAMSProcessor extends DefaultAMSProcessor {
+
+    OpportunisticAMSProcessor(RMContext rmContext, YarnScheduler
+        scheduler) {
+      super(rmContext, scheduler);
+    }
+
+    @Override
+    public RegisterApplicationMasterResponse registerApplicationMaster(
+        ApplicationAttemptId applicationAttemptId,
+        RegisterApplicationMasterRequest request) throws IOException {
+      final SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler)
+          getScheduler()).getApplicationAttempt(applicationAttemptId);
+      if (appAttempt.getOpportunisticContainerContext() == null) {
+        OpportunisticContainerContext opCtx =
+            new OpportunisticContainerContext();
+        opCtx.setContainerIdGenerator(new OpportunisticContainerAllocator
+            .ContainerIdGenerator() {
+          @Override
+          public long generateContainerId() {
+            return appAttempt.getAppSchedulingInfo().getNewContainerId();
+          }
+        });
+        int tokenExpiryInterval = getConfig()
+            .getInt(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS,
+                YarnConfiguration.
+                    DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS);
+        opCtx.updateAllocationParams(
+            getScheduler().getMinimumResourceCapability(),
+            getScheduler().getMaximumResourceCapability(),
+            getScheduler().getMinimumResourceCapability(),
+            tokenExpiryInterval);
+        appAttempt.setOpportunisticContainerContext(opCtx);
+      }
+      return super.registerApplicationMaster(applicationAttemptId, request);
+    }
+
+    @Override
+    public AllocateResponse allocate(ApplicationAttemptId appAttemptId,
+        AllocateRequest request) throws YarnException {
+      // Partition requests to GUARANTEED and OPPORTUNISTIC.
+      OpportunisticContainerAllocator.PartitionedResourceRequests
+          partitionedAsks =
+          oppContainerAllocator.partitionAskList(request.getAskList());
+
+      // Allocate OPPORTUNISTIC containers.
+      SchedulerApplicationAttempt appAttempt =
+          ((AbstractYarnScheduler)rmContext.getScheduler())
+              .getApplicationAttempt(appAttemptId);
+
+      OpportunisticContainerContext oppCtx =
+          appAttempt.getOpportunisticContainerContext();
+      oppCtx.updateNodeList(getLeastLoadedNodes());
+
+      List<Container> oppContainers =
+          oppContainerAllocator.allocateContainers(
+              request.getResourceBlacklistRequest(),
+              partitionedAsks.getOpportunistic(), appAttemptId, oppCtx,
+              ResourceManager.getClusterTimeStamp(), appAttempt.getUser());
+
+      // Create RMContainers and update the NMTokens.
+      if (!oppContainers.isEmpty()) {
+        handleNewContainers(oppContainers, false);
+        appAttempt.updateNMTokens(oppContainers);
+      }
+
+      // Allocate GUARANTEED containers.
+      request.setAskList(partitionedAsks.getGuaranteed());
+
+      AllocateResponse response = super.allocate(appAttemptId, request);
+      if (!oppContainers.isEmpty()) {
+        ApplicationMasterServiceUtils.addToAllocatedContainers(
+            response, oppContainers);
+      }
+      return response;
+    }
+  }
+
   public OpportunisticContainerAllocatorAMService(RMContext rmContext,
       YarnScheduler scheduler) {
     super(OpportunisticContainerAllocatorAMService.class.getName(),
@@ -160,6 +237,11 @@ public class OpportunisticContainerAllocatorAMService
   }
 
   @Override
+  protected ApplicationMasterServiceProcessor createProcessor() {
+    return new OpportunisticAMSProcessor(rmContext, rmContext.getScheduler());
+  }
+
+  @Override
   public Server getServer(YarnRPC rpc, Configuration serverConf,
       InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
     if (YarnConfiguration.isDistSchedulingEnabled(serverConf)) {
@@ -180,80 +262,6 @@ public class OpportunisticContainerAllocatorAMService
   }
 
   @Override
-  public RegisterApplicationMasterResponse registerApplicationMaster
-      (RegisterApplicationMasterRequest request) throws YarnException,
-      IOException {
-    final ApplicationAttemptId appAttemptId = getAppAttemptId();
-    final SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler)
-        rmContext.getScheduler()).getApplicationAttempt(appAttemptId);
-    if (appAttempt.getOpportunisticContainerContext() == null) {
-      OpportunisticContainerContext opCtx = new 
OpportunisticContainerContext();
-      opCtx.setContainerIdGenerator(new OpportunisticContainerAllocator
-          .ContainerIdGenerator() {
-        @Override
-        public long generateContainerId() {
-          return appAttempt.getAppSchedulingInfo().getNewContainerId();
-        }
-      });
-      int tokenExpiryInterval = getConfig()
-          .getInt(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS,
-              YarnConfiguration.DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS);
-      opCtx.updateAllocationParams(
-          rmContext.getScheduler().getMinimumResourceCapability(),
-          rmContext.getScheduler().getMaximumResourceCapability(),
-          rmContext.getScheduler().getMinimumResourceCapability(),
-          tokenExpiryInterval);
-      appAttempt.setOpportunisticContainerContext(opCtx);
-    }
-    return super.registerApplicationMaster(request);
-  }
-
-  @Override
-  public FinishApplicationMasterResponse finishApplicationMaster
-      (FinishApplicationMasterRequest request) throws YarnException,
-      IOException {
-    return super.finishApplicationMaster(request);
-  }
-
-  @Override
-  protected void allocateInternal(ApplicationAttemptId appAttemptId,
-      AllocateRequest request, AllocateResponse allocateResponse)
-      throws YarnException {
-
-    // Partition requests to GUARANTEED and OPPORTUNISTIC.
-    OpportunisticContainerAllocator.PartitionedResourceRequests
-        partitionedAsks =
-        oppContainerAllocator.partitionAskList(request.getAskList());
-
-    // Allocate OPPORTUNISTIC containers.
-    SchedulerApplicationAttempt appAttempt =
-        ((AbstractYarnScheduler)rmContext.getScheduler())
-            .getApplicationAttempt(appAttemptId);
-
-    OpportunisticContainerContext oppCtx =
-        appAttempt.getOpportunisticContainerContext();
-    oppCtx.updateNodeList(getLeastLoadedNodes());
-
-    List<Container> oppContainers =
-        oppContainerAllocator.allocateContainers(
-            request.getResourceBlacklistRequest(),
-            partitionedAsks.getOpportunistic(), appAttemptId, oppCtx,
-            ResourceManager.getClusterTimeStamp(), appAttempt.getUser());
-
-    // Create RMContainers and update the NMTokens.
-    if (!oppContainers.isEmpty()) {
-      handleNewContainers(oppContainers, false);
-      appAttempt.updateNMTokens(oppContainers);
-      addToAllocatedContainers(allocateResponse, oppContainers);
-    }
-
-    // Allocate GUARANTEED containers.
-    request.setAskList(partitionedAsks.getGuaranteed());
-
-    super.allocateInternal(appAttemptId, request, allocateResponse);
-  }
-
-  @Override
   public RegisterDistributedSchedulingAMResponse
       registerApplicationMasterForDistributedScheduling(
       RegisterApplicationMasterRequest request) throws YarnException,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/043b7d13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
index eaeb6a2..d459492 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
@@ -79,6 +79,9 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo
+    .FifoScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -652,6 +655,11 @@ public class TestOpportunisticContainerAllocatorAMService {
       public RMContainerTokenSecretManager getContainerTokenSecretManager() {
         return new RMContainerTokenSecretManager(conf);
       }
+
+      @Override
+      public ResourceScheduler getScheduler() {
+        return new FifoScheduler();
+      }
     };
     Container c = factory.newRecordInstance(Container.class);
     c.setExecutionType(ExecutionType.OPPORTUNISTIC);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to