YARN-3666. Federation Intercepting and propagating AM- home RM communications. 
(Botong Huang via Subru).


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7c29cf65
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7c29cf65
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7c29cf65

Branch: refs/heads/YARN-2915
Commit: 7c29cf6505ab404a543422f9590ffb5efa0063e5
Parents: f37a7c3
Author: Subru Krishnan <su...@apache.org>
Authored: Wed May 31 13:21:09 2017 -0700
Committer: Subru Krishnan <su...@apache.org>
Committed: Thu Jun 22 14:09:47 2017 -0700

----------------------------------------------------------------------
 .../dev-support/findbugs-exclude.xml            |   7 +
 .../amrmproxy/FederationInterceptor.java        | 510 +++++++++++++++++++
 .../amrmproxy/TestAMRMProxyService.java         |   1 +
 .../amrmproxy/TestFederationInterceptor.java    | 167 ++++++
 .../TestableFederationInterceptor.java          | 133 +++++
 5 files changed, 818 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c29cf65/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml 
b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index ee51094..034f03c 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -594,4 +594,11 @@
     <Bug pattern="UL_UNRELEASED_LOCK_EXCEPTION_PATH" />
   </Match>
 
+  <!-- Ignore false alert for RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE -->
+  <Match>
+    <Class 
name="org.apache.hadoop.yarn.server.nodemanager.amrmproxy.FederationInterceptor"
 />
+    <Method name="registerApplicationMaster" />
+    <Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE" />
+  </Match>
+
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c29cf65/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
new file mode 100644
index 0000000..5f82d69
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -0,0 +1,510 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import 
org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import 
org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extends the AbstractRequestInterceptor and provides an implementation for
+ * federation of YARN RM and scaling an application across multiple YARN
+ * sub-clusters. All the federation specific implementation is encapsulated in
+ * this class. This is always the last intercepter in the chain.
+ */
+public class FederationInterceptor extends AbstractRequestInterceptor {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(FederationInterceptor.class);
+
+  /**
+   * The home sub-cluster is the sub-cluster where the AM container is running
+   * in.
+   */
+  private ApplicationMasterProtocol homeRM;
+  private SubClusterId homeSubClusterId;
+
+  /**
+   * Used to keep track of the container Id and the sub cluster RM that created
+   * the container, so that we know which sub-cluster to forward later requests
+   * about existing containers to.
+   */
+  private Map<ContainerId, SubClusterId> containerIdToSubClusterIdMap;
+
+  /**
+   * The original registration request that was sent by the AM. This instance 
is
+   * reused to register/re-register with all the sub-cluster RMs.
+   */
+  private RegisterApplicationMasterRequest amRegistrationRequest;
+
+  /**
+   * The original registration response from home RM. This instance is reused
+   * for duplicate register request from AM, triggered by timeout between AM 
and
+   * AMRMProxy.
+   */
+  private RegisterApplicationMasterResponse amRegistrationResponse;
+
+  /** The proxy ugi used to talk to home RM. */
+  private UserGroupInformation appOwner;
+
+  /**
+   * Creates an instance of the FederationInterceptor class.
+   */
+  public FederationInterceptor() {
+    this.containerIdToSubClusterIdMap = new ConcurrentHashMap<>();
+    this.amRegistrationResponse = null;
+  }
+
+  /**
+   * Initializes the instance using specified context.
+   */
+  @Override
+  public void init(AMRMProxyApplicationContext appContext) {
+    super.init(appContext);
+    LOG.info("Initializing Federation Interceptor");
+
+    // Update the conf if available
+    Configuration conf = appContext.getConf();
+    if (conf == null) {
+      conf = getConf();
+    } else {
+      setConf(conf);
+    }
+
+    try {
+      this.appOwner = 
UserGroupInformation.createProxyUser(appContext.getUser(),
+          UserGroupInformation.getCurrentUser());
+    } catch (Exception ex) {
+      throw new YarnRuntimeException(ex);
+    }
+
+    this.homeSubClusterId =
+        SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
+    this.homeRM = createHomeRMProxy(appContext);
+  }
+
+  /**
+   * Sends the application master's registration request to the home RM.
+   *
+   * Between AM and AMRMProxy, FederationInterceptor modifies the RM behavior,
+   * so that when AM registers more than once, it returns the same register
+   * success response instead of throwing
+   * {@link InvalidApplicationMasterRequestException}. Furthermore, we present
+   * to AM as if we are the RM that never fails over. When actual RM fails 
over,
+   * we always re-register automatically.
+   *
+   * We did this because FederationInterceptor can receive concurrent register
+   * requests from AM because of timeout between AM and AMRMProxy, which is
+   * shorter than the timeout + failOver between FederationInterceptor
+   * (AMRMProxy) and RM.
+   */
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      RegisterApplicationMasterRequest request)
+      throws YarnException, IOException {
+    // If AM is calling with a different request, complain
+    if (this.amRegistrationRequest != null
+        && !this.amRegistrationRequest.equals(request)) {
+      throw new YarnException("A different request body recieved. AM should"
+          + " not call registerApplicationMaster with different request body");
+    }
+
+    // Save the registration request. This will be used for registering with
+    // secondary sub-clusters using UAMs, as well as re-register later
+    this.amRegistrationRequest = request;
+
+    /*
+     * Present to AM as if we are the RM that never fails over. When actual RM
+     * fails over, we always re-register automatically.
+     *
+     * We did this because it is possible for AM to send duplicate register
+     * request because of timeout. When it happens, it is fine to simply return
+     * the success message. Out of all outstanding register threads, only the
+     * last one will still have an unbroken RPC connection and successfully
+     * return the response.
+     */
+    if (this.amRegistrationResponse != null) {
+      return this.amRegistrationResponse;
+    }
+
+    /*
+     * Send a registration request to the home resource manager. Note that here
+     * we don't register with other sub-cluster resource managers because that
+     * will prevent us from using new sub-clusters that get added while the AM
+     * is running and will breaks the elasticity feature. The registration with
+     * the other sub-cluster RM will be done lazily as needed later.
+     */
+    try {
+      this.amRegistrationResponse =
+          this.homeRM.registerApplicationMaster(request);
+    } catch (InvalidApplicationMasterRequestException e) {
+      if (e.getMessage()
+          .contains(AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE)) {
+        // Some other register thread might have succeeded in the meantime
+        if (this.amRegistrationResponse != null) {
+          LOG.info("Other concurrent thread registered successfully, "
+              + "simply return the same success register response");
+          return this.amRegistrationResponse;
+        }
+      }
+      // This is a real issue, throw back to AM
+      throw e;
+    }
+
+    // the queue this application belongs will be used for getting
+    // AMRMProxy policy from state store.
+    String queue = this.amRegistrationResponse.getQueue();
+    if (queue == null) {
+      LOG.warn("Received null queue for application "
+          + 
getApplicationContext().getApplicationAttemptId().getApplicationId()
+          + " from home subcluster. Will use default queue name "
+          + YarnConfiguration.DEFAULT_QUEUE_NAME
+          + " for getting AMRMProxyPolicy");
+    } else {
+      LOG.info("Application "
+          + 
getApplicationContext().getApplicationAttemptId().getApplicationId()
+          + " belongs to queue " + queue);
+    }
+
+    return this.amRegistrationResponse;
+  }
+
+  /**
+   * Sends the heart beats to the home RM and the secondary sub-cluster RMs 
that
+   * are being used by the application.
+   */
+  @Override
+  public AllocateResponse allocate(AllocateRequest request)
+      throws YarnException {
+
+    try {
+      // Split the heart beat request into multiple requests, one for each
+      // sub-cluster RM that is used by this application.
+      Map<SubClusterId, AllocateRequest> requests =
+          splitAllocateRequest(request);
+
+      // Send the request to the home RM and get the response
+      AllocateResponse homeResponse = AMRMClientUtils.allocateWithReRegister(
+          requests.get(this.homeSubClusterId), this.homeRM,
+          this.amRegistrationRequest,
+          getApplicationContext().getApplicationAttemptId());
+
+      // If the resource manager sent us a new token, add to the current user
+      if (homeResponse.getAMRMToken() != null) {
+        LOG.debug("Received new AMRMToken");
+        YarnServerSecurityUtils.updateAMRMToken(homeResponse.getAMRMToken(),
+            this.appOwner, getConf());
+      }
+
+      // Merge the responses from home and secondary sub-cluster RMs
+      homeResponse = mergeAllocateResponses(homeResponse);
+
+      // return the final response to the application master.
+      return homeResponse;
+    } catch (IOException ex) {
+      LOG.error("Exception encountered while processing heart beat", ex);
+      throw new YarnException(ex);
+    }
+  }
+
+  /**
+   * Sends the finish application master request to all the resource managers
+   * used by the application.
+   */
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster(
+      FinishApplicationMasterRequest request)
+      throws YarnException, IOException {
+
+    FinishApplicationMasterResponse homeResponse =
+        AMRMClientUtils.finishAMWithReRegister(request, this.homeRM,
+            this.amRegistrationRequest,
+            getApplicationContext().getApplicationAttemptId());
+    return homeResponse;
+  }
+
+  @Override
+  public void setNextInterceptor(RequestInterceptor next) {
+    throw new YarnRuntimeException(
+        "setNextInterceptor is being called on FederationInterceptor. "
+            + "It should always be used as the last interceptor in the chain");
+  }
+
+  /**
+   * This is called when the application pipeline is being destroyed. We will
+   * release all the resources that we are holding in this call.
+   */
+  @Override
+  public void shutdown() {
+    super.shutdown();
+  }
+
+  /**
+   * Returns instance of the ApplicationMasterProtocol proxy class that is used
+   * to connect to the Home resource manager.
+   *
+   * @param appContext AMRMProxyApplicationContext
+   * @return the proxy created
+   */
+  protected ApplicationMasterProtocol createHomeRMProxy(
+      AMRMProxyApplicationContext appContext) {
+    try {
+      return FederationProxyProviderUtil.createRMProxy(appContext.getConf(),
+          ApplicationMasterProtocol.class, this.homeSubClusterId, 
this.appOwner,
+          appContext.getAMRMToken());
+    } catch (Exception ex) {
+      throw new YarnRuntimeException(ex);
+    }
+  }
+
+  /**
+   * In federation, the heart beat request needs to be sent to all the sub
+   * clusters from which the AM has requested containers. This method splits 
the
+   * specified AllocateRequest from the AM and creates a new request for each
+   * sub-cluster RM.
+   */
+  private Map<SubClusterId, AllocateRequest> splitAllocateRequest(
+      AllocateRequest request) throws YarnException {
+    Map<SubClusterId, AllocateRequest> requestMap =
+        new HashMap<SubClusterId, AllocateRequest>();
+
+    // Create heart beat request for home sub-cluster resource manager
+    findOrCreateAllocateRequestForSubCluster(this.homeSubClusterId, request,
+        requestMap);
+
+    if (!isNullOrEmpty(request.getAskList())) {
+      AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
+          this.homeSubClusterId, request, requestMap);
+      newRequest.getAskList().addAll(request.getAskList());
+    }
+
+    if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty(
+        request.getResourceBlacklistRequest().getBlacklistAdditions())) {
+      for (String resourceName : request.getResourceBlacklistRequest()
+          .getBlacklistAdditions()) {
+        AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
+            this.homeSubClusterId, request, requestMap);
+        newRequest.getResourceBlacklistRequest().getBlacklistAdditions()
+            .add(resourceName);
+      }
+    }
+
+    if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty(
+        request.getResourceBlacklistRequest().getBlacklistRemovals())) {
+      for (String resourceName : request.getResourceBlacklistRequest()
+          .getBlacklistRemovals()) {
+        AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
+            this.homeSubClusterId, request, requestMap);
+        newRequest.getResourceBlacklistRequest().getBlacklistRemovals()
+            .add(resourceName);
+      }
+    }
+
+    if (!isNullOrEmpty(request.getReleaseList())) {
+      for (ContainerId cid : request.getReleaseList()) {
+        if (warnIfNotExists(cid, "release")) {
+          SubClusterId subClusterId =
+              this.containerIdToSubClusterIdMap.get(cid);
+          AllocateRequest newRequest = requestMap.get(subClusterId);
+          newRequest.getReleaseList().add(cid);
+        }
+      }
+    }
+
+    if (!isNullOrEmpty(request.getUpdateRequests())) {
+      for (UpdateContainerRequest ucr : request.getUpdateRequests()) {
+        if (warnIfNotExists(ucr.getContainerId(), "update")) {
+          SubClusterId subClusterId =
+              this.containerIdToSubClusterIdMap.get(ucr.getContainerId());
+          AllocateRequest newRequest = requestMap.get(subClusterId);
+          newRequest.getUpdateRequests().add(ucr);
+        }
+      }
+    }
+
+    return requestMap;
+  }
+
+  /**
+   * Merges the responses from other sub-clusters that we received
+   * asynchronously with the specified home cluster response and keeps track of
+   * the containers received from each sub-cluster resource managers.
+   */
+  private AllocateResponse mergeAllocateResponses(
+      AllocateResponse homeResponse) {
+    // Timing issue, we need to remove the completed and then save the new 
ones.
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Remove containers: "
+          + homeResponse.getCompletedContainersStatuses());
+      LOG.debug("Adding containers: " + homeResponse.getAllocatedContainers());
+    }
+    removeFinishedContainersFromCache(
+        homeResponse.getCompletedContainersStatuses());
+    cacheAllocatedContainers(homeResponse.getAllocatedContainers(),
+        this.homeSubClusterId);
+
+    return homeResponse;
+  }
+
+  /**
+   * Removes the finished containers from the local cache.
+   */
+  private void removeFinishedContainersFromCache(
+      List<ContainerStatus> finishedContainers) {
+    for (ContainerStatus container : finishedContainers) {
+      if (containerIdToSubClusterIdMap
+          .containsKey(container.getContainerId())) {
+        containerIdToSubClusterIdMap.remove(container.getContainerId());
+      }
+    }
+  }
+
+  /**
+   * Add allocated containers to cache mapping.
+   */
+  private void cacheAllocatedContainers(List<Container> containers,
+      SubClusterId subClusterId) {
+    for (Container container : containers) {
+      if (containerIdToSubClusterIdMap.containsKey(container.getId())) {
+        SubClusterId existingSubClusterId =
+            containerIdToSubClusterIdMap.get(container.getId());
+        if (existingSubClusterId.equals(subClusterId)) {
+          // When RM fails over, the new RM master might send out the same
+          // container allocation more than once. Just move on in this case.
+          LOG.warn(
+              "Duplicate containerID: {} found in the allocated containers"
+                  + " from same subcluster: {}, so ignoring.",
+              container.getId(), subClusterId);
+        } else {
+          // The same container allocation from different subclusters,
+          // something is wrong.
+          // TODO: YARN-6667 if some subcluster RM is configured wrong, we
+          // should not fail the entire heartbeat.
+          throw new YarnRuntimeException(
+              "Duplicate containerID found in the allocated containers. This"
+                  + " can happen if the RM epoch is not configured properly."
+                  + " ContainerId: " + container.getId().toString()
+                  + " ApplicationId: "
+                  + getApplicationContext().getApplicationAttemptId()
+                  + " From RM: " + subClusterId
+                  + " . Previous container was from subcluster: "
+                  + existingSubClusterId);
+        }
+      }
+
+      containerIdToSubClusterIdMap.put(container.getId(), subClusterId);
+    }
+  }
+
+  /**
+   * Check to see if an AllocateRequest exists in the Map for the specified sub
+   * cluster. If not found, create a new one, copy the value of responseId and
+   * progress from the orignialAMRequest, save it in the specified Map and
+   * return the new instance. If found, just return the old instance.
+   */
+  private static AllocateRequest findOrCreateAllocateRequestForSubCluster(
+      SubClusterId subClusterId, AllocateRequest originalAMRequest,
+      Map<SubClusterId, AllocateRequest> requestMap) {
+    AllocateRequest newRequest = null;
+    if (requestMap.containsKey(subClusterId)) {
+      newRequest = requestMap.get(subClusterId);
+    } else {
+      newRequest = createAllocateRequest();
+      newRequest.setResponseId(originalAMRequest.getResponseId());
+      newRequest.setProgress(originalAMRequest.getProgress());
+      requestMap.put(subClusterId, newRequest);
+    }
+
+    return newRequest;
+  }
+
+  /**
+   * Create an empty AllocateRequest instance.
+   */
+  private static AllocateRequest createAllocateRequest() {
+    AllocateRequest request =
+        AllocateRequest.newInstance(0, 0, null, null, null);
+    request.setAskList(new ArrayList<ResourceRequest>());
+    request.setReleaseList(new ArrayList<ContainerId>());
+    ResourceBlacklistRequest blackList =
+        ResourceBlacklistRequest.newInstance(null, null);
+    blackList.setBlacklistAdditions(new ArrayList<String>());
+    blackList.setBlacklistRemovals(new ArrayList<String>());
+    request.setResourceBlacklistRequest(blackList);
+    request.setUpdateRequests(new ArrayList<UpdateContainerRequest>());
+    return request;
+  }
+
+  /**
+   * Check to see if the specified containerId exists in the cache and log an
+   * error if not found.
+   *
+   * @param containerId the container id
+   * @param actionName the name of the action
+   * @return true if the container exists in the map, false otherwise
+   */
+  private boolean warnIfNotExists(ContainerId containerId, String actionName) {
+    if (!this.containerIdToSubClusterIdMap.containsKey(containerId)) {
+      LOG.error("AM is trying to {} a container {} that does not exist. ",
+          actionName, containerId.toString());
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Utility method to check if the specified Collection is null or empty
+   *
+   * @param c the collection object
+   * @param <T> element type of the collection
+   * @return whether is it is null or empty
+   */
+  public static <T> boolean isNullOrEmpty(Collection<T> c) {
+    return (c == null || c.size() == 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c29cf65/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
index e734bdd..72e5f53 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
 import 
org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService.RequestInterceptorChainWrapper;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c29cf65/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
new file mode 100644
index 0000000..3b564f0
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import 
org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extends the TestAMRMProxyService and overrides methods in order to use the
+ * AMRMProxyService's pipeline test cases for testing the FederationInterceptor
+ * class. The tests for AMRMProxyService has been written cleverly so that it
+ * can be reused to validate different request intercepter chains.
+ */
+public class TestFederationInterceptor extends BaseAMRMProxyTest {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestFederationInterceptor.class);
+
+  public static final String HOME_SC_ID = "SC-home";
+
+  private TestableFederationInterceptor interceptor;
+
+  private int testAppId;
+  private ApplicationAttemptId attemptId;
+
+  @Override
+  public void setUp() throws IOException {
+    super.setUp();
+    interceptor = new TestableFederationInterceptor();
+
+    testAppId = 1;
+    attemptId = getApplicationAttemptId(testAppId);
+    interceptor.init(new AMRMProxyApplicationContextImpl(null, getConf(),
+        attemptId, "test-user", null, null));
+  }
+
+  @Override
+  public void tearDown() {
+    interceptor.shutdown();
+    super.tearDown();
+  }
+
+  @Override
+  protected YarnConfiguration createConfiguration() {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+    String mockPassThroughInterceptorClass =
+        PassThroughRequestInterceptor.class.getName();
+
+    // Create a request intercepter pipeline for testing. The last one in the
+    // chain is the federation intercepter that calls the mock resource 
manager.
+    // The others in the chain will simply forward it to the next one in the
+    // chain
+    conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
+        mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
+            + "," + TestableFederationInterceptor.class.getName());
+
+    conf.set(YarnConfiguration.RM_CLUSTER_ID, HOME_SC_ID);
+
+    return conf;
+  }
+
+  @Test
+  public void testRequestInterceptorChainCreation() throws Exception {
+    RequestInterceptor root =
+        super.getAMRMProxyService().createRequestInterceptorChain();
+    int index = 0;
+    while (root != null) {
+      switch (index) {
+      case 0:
+      case 1:
+        Assert.assertEquals(PassThroughRequestInterceptor.class.getName(),
+            root.getClass().getName());
+        break;
+      case 2:
+        Assert.assertEquals(TestableFederationInterceptor.class.getName(),
+            root.getClass().getName());
+        break;
+      default:
+        Assert.fail();
+      }
+      root = root.getNextInterceptor();
+      index++;
+    }
+    Assert.assertEquals("The number of interceptors in chain does not match",
+        Integer.toString(3), Integer.toString(index));
+  }
+
+  /**
+   * Between AM and AMRMProxy, FederationInterceptor modifies the RM behavior,
+   * so that when AM registers more than once, it returns the same register
+   * success response instead of throwing
+   * {@link InvalidApplicationMasterRequestException}
+   *
+   * We did this because FederationInterceptor can receive concurrent register
+   * requests from AM because of timeout between AM and AMRMProxy. This can
+   * possible since the timeout between FederationInterceptor and RM longer
+   * because of performFailover + timeout.
+   */
+  @Test
+  public void testTwoIdenticalRegisterRequest() throws Exception {
+    // Register the application twice
+    RegisterApplicationMasterRequest registerReq =
+        Records.newRecord(RegisterApplicationMasterRequest.class);
+    registerReq.setHost(Integer.toString(testAppId));
+    registerReq.setRpcPort(testAppId);
+    registerReq.setTrackingUrl("");
+
+    for (int i = 0; i < 2; i++) {
+      RegisterApplicationMasterResponse registerResponse =
+          interceptor.registerApplicationMaster(registerReq);
+      Assert.assertNotNull(registerResponse);
+    }
+  }
+
+  @Test
+  public void testTwoDifferentRegisterRequest() throws Exception {
+    // Register the application first time
+    RegisterApplicationMasterRequest registerReq =
+        Records.newRecord(RegisterApplicationMasterRequest.class);
+    registerReq.setHost(Integer.toString(testAppId));
+    registerReq.setRpcPort(testAppId);
+    registerReq.setTrackingUrl("");
+
+    RegisterApplicationMasterResponse registerResponse =
+        interceptor.registerApplicationMaster(registerReq);
+    Assert.assertNotNull(registerResponse);
+
+    // Register the application second time with a different request obj
+    registerReq = Records.newRecord(RegisterApplicationMasterRequest.class);
+    registerReq.setHost(Integer.toString(testAppId));
+    registerReq.setRpcPort(testAppId);
+    registerReq.setTrackingUrl("different");
+    try {
+      registerResponse = interceptor.registerApplicationMaster(registerReq);
+      Assert.fail("Should throw if a different request obj is used");
+    } catch (YarnException e) {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c29cf65/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
new file mode 100644
index 0000000..0ca7488
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
+import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
+import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
+
+/**
+ * Extends the FederationInterceptor and overrides methods to provide a 
testable
+ * implementation of FederationInterceptor.
+ */
+public class TestableFederationInterceptor extends FederationInterceptor {
+  private ConcurrentHashMap<String, MockResourceManagerFacade>
+      secondaryResourceManagers = new ConcurrentHashMap<>();
+  private AtomicInteger runningIndex = new AtomicInteger(0);
+  private MockResourceManagerFacade mockRm;
+
+  @Override
+  protected ApplicationMasterProtocol createHomeRMProxy(
+      AMRMProxyApplicationContext appContext) {
+    synchronized (this) {
+      if (mockRm == null) {
+        mockRm = new MockResourceManagerFacade(
+            new YarnConfiguration(super.getConf()), 0);
+      }
+    }
+    return mockRm;
+  }
+
+  @SuppressWarnings("unchecked")
+  protected <T> T createSecondaryRMProxy(Class<T> proxyClass,
+      Configuration conf, String subClusterId) throws IOException {
+    // We create one instance of the mock resource manager per sub cluster. 
Keep
+    // track of the instances of the RMs in the map keyed by the sub cluster id
+    synchronized (this.secondaryResourceManagers) {
+      if (this.secondaryResourceManagers.contains(subClusterId)) {
+        return (T) this.secondaryResourceManagers.get(subClusterId);
+      } else {
+        // The running index here is used to simulate different RM_EPOCH to
+        // generate unique container identifiers in a federation environment
+        MockResourceManagerFacade rm = new MockResourceManagerFacade(
+            new Configuration(conf), runningIndex.addAndGet(10000));
+        this.secondaryResourceManagers.put(subClusterId, rm);
+        return (T) rm;
+      }
+    }
+  }
+
+  protected void setShouldReRegisterNext() {
+    if (mockRm != null) {
+      mockRm.setShouldReRegisterNext();
+    }
+    for (MockResourceManagerFacade subCluster : secondaryResourceManagers
+        .values()) {
+      subCluster.setShouldReRegisterNext();
+    }
+  }
+
+  /**
+   * Extends the UnmanagedAMPoolManager and overrides methods to provide a
+   * testable implementation of UnmanagedAMPoolManager.
+   */
+  protected class TestableUnmanagedAMPoolManager
+      extends UnmanagedAMPoolManager {
+    public TestableUnmanagedAMPoolManager(ExecutorService threadpool) {
+      super(threadpool);
+    }
+
+    @Override
+    public UnmanagedApplicationManager createUAM(Configuration conf,
+        ApplicationId appId, String queueName, String submitter,
+        String appNameSuffix) {
+      return new TestableUnmanagedApplicationManager(conf, appId, queueName,
+          submitter, appNameSuffix);
+    }
+  }
+
+  /**
+   * Extends the UnmanagedApplicationManager and overrides methods to provide a
+   * testable implementation.
+   */
+  protected class TestableUnmanagedApplicationManager
+      extends UnmanagedApplicationManager {
+
+    public TestableUnmanagedApplicationManager(Configuration conf,
+        ApplicationId appId, String queueName, String submitter,
+        String appNameSuffix) {
+      super(conf, appId, queueName, submitter, appNameSuffix);
+    }
+
+    /**
+     * We override this method here to return a mock RM instances. The base
+     * class returns the proxy to the real RM which will not work in case of
+     * stand alone test cases.
+     */
+    @Override
+    protected <T> T createRMProxy(Class<T> protocol, Configuration config,
+        UserGroupInformation user, Token<AMRMTokenIdentifier> token)
+        throws IOException {
+      return createSecondaryRMProxy(protocol, config,
+          YarnConfiguration.getClusterId(config));
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to