YARN-2885. Create AMRMProxy request interceptor and ContainerAllocator to 
distribute OPPORTUNISTIC containers to appropriate Nodes (asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b00875e6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b00875e6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b00875e6

Branch: refs/heads/yarn-2877
Commit: b00875e602c6f2b7e03ff613552936ef749f5012
Parents: fd6a6da
Author: Arun Suresh <asur...@apache.org>
Authored: Thu Feb 11 08:57:58 2016 -0800
Committer: Arun Suresh <asur...@apache.org>
Committed: Thu Feb 11 09:04:25 2016 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES-yarn-2877.txt       |   3 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  42 ++
 .../yarn/api/records/impl/pb/ProtoUtils.java    |  15 +
 .../hadoop-yarn-server-common/pom.xml           |   1 +
 .../api/DistributedSchedulerProtocol.java       |  78 ++++
 .../api/DistributedSchedulerProtocolPB.java     |  36 ++
 .../hadoop/yarn/server/api/ServerRMProxy.java   |   4 +
 ...istributedSchedulerProtocolPBClientImpl.java | 151 +++++++
 ...stributedSchedulerProtocolPBServiceImpl.java | 143 ++++++
 .../DistSchedAllocateResponse.java              |  58 +++
 .../DistSchedRegisterResponse.java              | 102 +++++
 .../pb/DistSchedAllocateResponsePBImpl.java     | 180 ++++++++
 .../pb/DistSchedRegisterResponsePBImpl.java     | 304 +++++++++++++
 .../proto/distributed_scheduler_protocol.proto  |  38 ++
 .../yarn_server_common_service_protos.proto     |  15 +
 .../hadoop/yarn/server/nodemanager/Context.java |   5 +
 .../yarn/server/nodemanager/NodeManager.java    |  36 +-
 .../nodemanager/amrmproxy/AMRMProxyService.java |   8 +
 .../amrmproxy/AbstractRequestInterceptor.java   |  42 ++
 .../amrmproxy/DefaultRequestInterceptor.java    |  88 +++-
 .../amrmproxy/RequestInterceptor.java           |   4 +-
 .../nodemanager/scheduler/LocalScheduler.java   | 437 +++++++++++++++++++
 .../OpportunisticContainerAllocator.java        | 185 ++++++++
 .../security/NMTokenSecretManagerInNM.java      |  24 +-
 .../yarn/server/nodemanager/TestEventFlow.java  |   2 +-
 .../nodemanager/TestNodeStatusUpdater.java      |   4 +-
 .../amrmproxy/BaseAMRMProxyTest.java            |  10 +
 .../BaseContainerManagerTest.java               |   2 +-
 .../TestContainerManagerRecovery.java           |   2 +-
 .../launcher/TestContainerLaunch.java           |   2 +-
 .../TestLocalCacheDirectoryManager.java         |   3 +-
 .../TestResourceLocalizationService.java        |   4 +-
 .../scheduler/TestLocalScheduler.java           | 212 +++++++++
 .../webapp/TestContainerLogsPage.java           |   6 +-
 .../nodemanager/webapp/TestNMWebServer.java     |   4 +-
 .../nodemanager/webapp/TestNMWebServices.java   |   2 +-
 .../webapp/TestNMWebServicesApps.java           |   2 +-
 .../webapp/TestNMWebServicesContainers.java     |   2 +-
 .../ApplicationMasterService.java               |  39 +-
 .../DistributedSchedulingService.java           | 162 +++++++
 .../server/resourcemanager/ResourceManager.java |  10 +
 .../scheduler/AppSchedulingInfo.java            |   3 +-
 .../yarn/server/resourcemanager/MockRM.java     |  17 +
 .../TestDistributedSchedulingService.java       | 170 ++++++++
 44 files changed, 2615 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/CHANGES-yarn-2877.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES-yarn-2877.txt 
b/hadoop-yarn-project/CHANGES-yarn-2877.txt
index d29ff0f..a147866 100644
--- a/hadoop-yarn-project/CHANGES-yarn-2877.txt
+++ b/hadoop-yarn-project/CHANGES-yarn-2877.txt
@@ -13,3 +13,6 @@ yarn-2877 distributed scheduling (Unreleased)
     YARN-4335. Allow ResourceRequests to specify ExecutionType of a request
     ask (kkaranasos via asuresh)
 
+    YARN-2885. Create AMRMProxy request interceptor and ContainerAllocator
+    to distribute OPPORTUNISTIC containers to appropriate Nodes (asuresh)
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index d84c155..edae3eb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -291,6 +291,48 @@ public class YarnConfiguration extends Configuration {
   /** ACL used in case none is found. Allows nothing. */
   public static final String DEFAULT_YARN_APP_ACL = " ";
 
+  /** Is Distributed Scheduling Enabled. */
+  public static String DIST_SCHEDULING_ENABLED =
+      YARN_PREFIX + "distributed-scheduling.enabled";
+  public static final boolean DIST_SCHEDULING_ENABLED_DEFAULT = false;
+
+  /** Mininum allocatable container memory for Distributed Scheduling. */
+  public static String DIST_SCHEDULING_MIN_MEMORY =
+      YARN_PREFIX + "distributed-scheduling.min-memory";
+  public static final int DIST_SCHEDULING_MIN_MEMORY_DEFAULT = 512;
+
+  /** Mininum allocatable container vcores for Distributed Scheduling. */
+  public static String DIST_SCHEDULING_MIN_VCORES =
+      YARN_PREFIX + "distributed-scheduling.min-vcores";
+  public static final int DIST_SCHEDULING_MIN_VCORES_DEFAULT = 1;
+
+  /** Maximum allocatable container memory for Distributed Scheduling. */
+  public static String DIST_SCHEDULING_MAX_MEMORY =
+      YARN_PREFIX + "distributed-scheduling.max-memory";
+  public static final int DIST_SCHEDULING_MAX_MEMORY_DEFAULT = 2048;
+
+  /** Maximum allocatable container vcores for Distributed Scheduling. */
+  public static String DIST_SCHEDULING_MAX_VCORES =
+      YARN_PREFIX + "distributed-scheduling.max-vcores";
+  public static final int DIST_SCHEDULING_MAX_VCORES_DEFAULT = 4;
+
+  /** Incremental allocatable container memory for Distributed Scheduling. */
+  public static String DIST_SCHEDULING_INCR_MEMORY =
+      YARN_PREFIX + "distributed-scheduling.incr-memory";
+  public static final int DIST_SCHEDULING_INCR_MEMORY_DEFAULT = 512;
+
+  /** Incremental allocatable container vcores for Distributed Scheduling. */
+  public static String DIST_SCHEDULING_INCR_VCORES =
+      YARN_PREFIX + "distributed-scheduling.incr-vcores";
+  public static final int DIST_SCHEDULING_INCR_VCORES_DEFAULT = 1;
+
+  /** Container token expiry for container allocated via Distributed
+   * Scheduling. */
+  public static String DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS =
+      YARN_PREFIX + "distributed-scheduling.container-token-expiry";
+  public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT =
+      600000;
+
   /**
    * Enable/disable intermediate-data encryption at YARN level. For now, this
    * only is used by the FileSystemRMStateStore to setup right file-system

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
index 29ed0f3..9d683f1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
@@ -37,8 +37,10 @@ import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.hadoop.yarn.proto.YarnProtos.AMCommandProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAccessTypeProto;
 import 
org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
@@ -294,4 +296,17 @@ public class ProtoUtils {
   public static ExecutionType convertFromProtoFormat(ExecutionTypeProto e) {
     return ExecutionType.valueOf(e.name());
   }
+
+  /*
+   * Resource
+   */
+  public static synchronized YarnProtos.ResourceProto convertToProtoFormat(
+      Resource r) {
+    return ((ResourcePBImpl) r).getProto();
+  }
+
+  public static Resource convertFromProtoFormat(
+      YarnProtos.ResourceProto resource) {
+    return new ResourcePBImpl(resource);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
index 2958b81..3a6bddd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
@@ -142,6 +142,7 @@
               <source>
                 <directory>${basedir}/src/main/proto</directory>
                 <includes>
+                  <include>distributed_scheduler_protocol.proto</include>
                   <include>yarn_server_common_protos.proto</include>
                   <include>yarn_server_common_service_protos.proto</include>
                   <include>yarn_server_common_service_protos.proto</include>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
new file mode 100644
index 0000000..490c25b
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.io.retry.Idempotent;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import java.io.IOException;
+
+/**
+ * <p>This protocol extends the <code>ApplicationMasterProtocol</code>. It is
+ * used by the <code>LocalScheduler</code> running on the NodeManager to wrap
+ * the request / response objects of the <code>registerApplicationMaster</code>
+ * and <code>allocate</code> methods of the protocol with addition information
+ * required to perform Distributed Scheduling.
+ * </p>
+ */
+public interface DistributedSchedulerProtocol
+    extends ApplicationMasterProtocol {
+
+  /**
+   * <p> Extends the <code>registerApplicationMaster</code> to wrap the 
response
+   * with additional metadata.</p>
+   *
+   * @param request ApplicationMaster registration request
+   * @return A <code>DistSchedRegisterResponse</code> that contains a standard
+   *         AM registration response along with additional information 
required
+   *         for Distributed Scheduling
+   * @throws YarnException
+   * @throws IOException
+   */
+  @Public
+  @Unstable
+  @Idempotent
+  DistSchedRegisterResponse registerApplicationMasterForDistributedScheduling(
+      RegisterApplicationMasterRequest request)
+      throws YarnException, IOException;
+
+  /**
+   * <p> Extends the <code>allocate</code> to wrap the response with additional
+   * metadata.</p>
+   *
+   * @param request ApplicationMaster allocate request
+   * @return A <code>DistSchedAllocateResponse</code> that contains a standard
+   *         AM allocate response along with additional information required
+   *         for Distributed Scheduling
+   * @throws YarnException
+   * @throws IOException
+   */
+  @Public
+  @Unstable
+  @Idempotent
+  DistSchedAllocateResponse allocateForDistributedScheduling(
+      AllocateRequest request) throws YarnException, IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocolPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocolPB.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocolPB.java
new file mode 100644
index 0000000..413b9c9
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocolPB.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import 
org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
+
+
+import org.apache.hadoop.yarn.proto.DistributedSchedulerProtocol;
+
+@Private
+@Unstable
+@ProtocolInfo(protocolName = 
"org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB",
+    protocolVersion = 1)
+public interface DistributedSchedulerProtocolPB extends
+    
DistributedSchedulerProtocol.DistributedSchedulerProtocolService.BlockingInterface,
+    ApplicationMasterProtocolService.BlockingInterface {
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
index 2d4085f..c23e27c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
@@ -78,6 +78,10 @@ public class ServerRMProxy<T> extends RMProxy<T> {
         YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
         YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
         YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
+    } else if (protocol == DistributedSchedulerProtocol.class) {
+      return conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+          YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+          YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
     } else {
       String message = "Unsupported protocol found when creating the proxy " +
           "connection to ResourceManager: " +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
new file mode 100644
index 0000000..c1dd9e5
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.impl.pb.client;
+
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
+import 
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
+
+
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+    .FinishApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+    .FinishApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+    .RegisterApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+    .RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+public class DistributedSchedulerProtocolPBClientImpl implements
+    DistributedSchedulerProtocol, Closeable {
+
+  private DistributedSchedulerProtocolPB proxy;
+
+  public DistributedSchedulerProtocolPBClientImpl(long clientVersion,
+      InetSocketAddress addr,
+      Configuration conf) throws IOException {
+    RPC.setProtocolEngine(conf, DistributedSchedulerProtocolPB.class,
+        ProtobufRpcEngine.class);
+    proxy = RPC.getProxy(DistributedSchedulerProtocolPB.class, clientVersion,
+        addr, conf);
+  }
+
+  @Override
+  public void close() {
+    if (this.proxy != null) {
+      RPC.stopProxy(this.proxy);
+    }
+  }
+
+  @Override
+  public DistSchedRegisterResponse
+  registerApplicationMasterForDistributedScheduling
+      (RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    YarnServiceProtos.RegisterApplicationMasterRequestProto requestProto =
+        ((RegisterApplicationMasterRequestPBImpl) request).getProto();
+    try {
+      return new DistSchedRegisterResponsePBImpl(
+          proxy.registerApplicationMasterForDistributedScheduling(
+              null, requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+
+  @Override
+  public DistSchedAllocateResponse allocateForDistributedScheduling
+      (AllocateRequest request) throws YarnException, IOException {
+    YarnServiceProtos.AllocateRequestProto requestProto =
+        ((AllocateRequestPBImpl) request).getProto();
+    try {
+      return new DistSchedAllocateResponsePBImpl(
+          proxy.allocateForDistributedScheduling(null, requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster
+      (RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    YarnServiceProtos.RegisterApplicationMasterRequestProto requestProto =
+        ((RegisterApplicationMasterRequestPBImpl) request).getProto();
+    try {
+      return new RegisterApplicationMasterResponsePBImpl(
+          proxy.registerApplicationMaster(null, requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster
+      (FinishApplicationMasterRequest request) throws YarnException,
+      IOException {
+    YarnServiceProtos.FinishApplicationMasterRequestProto requestProto =
+        ((FinishApplicationMasterRequestPBImpl) request).getProto();
+    try {
+      return new FinishApplicationMasterResponsePBImpl(
+          proxy.finishApplicationMaster(null, requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+
+  @Override
+  public AllocateResponse allocate(AllocateRequest request) throws
+      YarnException, IOException {
+    YarnServiceProtos.AllocateRequestProto requestProto =
+        ((AllocateRequestPBImpl) request).getProto();
+    try {
+      return new AllocateResponsePBImpl(proxy.allocate(null, requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
new file mode 100644
index 0000000..8be2893
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.impl.pb.service;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords
+    .FinishApplicationMasterResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
+import 
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+    .FinishApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+    .FinishApplicationMasterResponsePBImpl;
+import 
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
+import 
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto;
+import 
org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto;
+
+import java.io.IOException;
+
+public class DistributedSchedulerProtocolPBServiceImpl implements
+    DistributedSchedulerProtocolPB {
+
+  private DistributedSchedulerProtocol real;
+
+  public DistributedSchedulerProtocolPBServiceImpl(
+      DistributedSchedulerProtocol impl) {
+    this.real = impl;
+  }
+
+  @Override
+  public YarnServerCommonServiceProtos.DistSchedRegisterResponseProto
+  registerApplicationMasterForDistributedScheduling(RpcController controller,
+      RegisterApplicationMasterRequestProto proto) throws
+      ServiceException {
+    RegisterApplicationMasterRequestPBImpl request = new
+        RegisterApplicationMasterRequestPBImpl(proto);
+    try {
+      DistSchedRegisterResponse response =
+          real.registerApplicationMasterForDistributedScheduling(request);
+      return ((DistSchedRegisterResponsePBImpl) response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public YarnServerCommonServiceProtos.DistSchedAllocateResponseProto
+  allocateForDistributedScheduling(RpcController controller,
+      AllocateRequestProto proto) throws ServiceException {
+    AllocateRequestPBImpl request = new AllocateRequestPBImpl(proto);
+    try {
+      DistSchedAllocateResponse response = real
+          .allocateForDistributedScheduling(request);
+      return ((DistSchedAllocateResponsePBImpl) response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public YarnServiceProtos.AllocateResponseProto allocate(RpcController arg0,
+      AllocateRequestProto proto) throws ServiceException {
+    AllocateRequestPBImpl request = new AllocateRequestPBImpl(proto);
+    try {
+      AllocateResponse response = real.allocate(request);
+      return ((AllocateResponsePBImpl) response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public YarnServiceProtos.FinishApplicationMasterResponseProto
+  finishApplicationMaster(
+      RpcController arg0, YarnServiceProtos
+      .FinishApplicationMasterRequestProto proto)
+      throws ServiceException {
+    FinishApplicationMasterRequestPBImpl request = new
+        FinishApplicationMasterRequestPBImpl(proto);
+    try {
+      FinishApplicationMasterResponse response = real.finishApplicationMaster
+          (request);
+      return ((FinishApplicationMasterResponsePBImpl) response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public YarnServiceProtos.RegisterApplicationMasterResponseProto
+  registerApplicationMaster(
+      RpcController arg0, RegisterApplicationMasterRequestProto proto)
+      throws ServiceException {
+    RegisterApplicationMasterRequestPBImpl request = new
+        RegisterApplicationMasterRequestPBImpl(proto);
+    try {
+      RegisterApplicationMasterResponse response = real
+          .registerApplicationMaster(request);
+      return ((RegisterApplicationMasterResponsePBImpl) response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateResponse.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateResponse.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateResponse.java
new file mode 100644
index 0000000..5f6e069
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateResponse.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.util.List;
+
+@Public
+@Unstable
+public abstract class DistSchedAllocateResponse {
+
+  @Public
+  @Unstable
+  public static DistSchedAllocateResponse newInstance(AllocateResponse
+      allResp) {
+    DistSchedAllocateResponse response =
+        Records.newRecord(DistSchedAllocateResponse.class);
+    response.setAllocateResponse(allResp);
+    return  response;
+  }
+
+  @Public
+  @Unstable
+  public abstract void setAllocateResponse(AllocateResponse response);
+
+  @Public
+  @Unstable
+  public abstract AllocateResponse getAllocateResponse();
+
+  @Public
+  @Unstable
+  public abstract void setNodesForScheduling(List<NodeId> nodesForScheduling);
+
+  @Public
+  @Unstable
+  public abstract List<NodeId> getNodesForScheduling();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedRegisterResponse.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedRegisterResponse.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedRegisterResponse.java
new file mode 100644
index 0000000..e4e5138
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedRegisterResponse.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.protocolrecords
+    .RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.util.List;
+
+@Public
+@Unstable
+public abstract class DistSchedRegisterResponse {
+
+  @Public
+  @Unstable
+  public static DistSchedRegisterResponse newInstance
+      (RegisterApplicationMasterResponse regAMResp) {
+    DistSchedRegisterResponse response =
+        Records.newRecord(DistSchedRegisterResponse.class);
+    response.setRegisterResponse(regAMResp);
+    return response;
+  }
+
+  @Public
+  @Unstable
+  public abstract void setRegisterResponse(
+      RegisterApplicationMasterResponse resp);
+
+  @Public
+  @Unstable
+  public abstract RegisterApplicationMasterResponse getRegisterResponse();
+
+  @Public
+  @Unstable
+  public abstract void setMinAllocatableCapabilty(Resource minResource);
+
+  @Public
+  @Unstable
+  public abstract Resource getMinAllocatableCapabilty();
+
+  @Public
+  @Unstable
+  public abstract void setMaxAllocatableCapabilty(Resource maxResource);
+
+  @Public
+  @Unstable
+  public abstract Resource getMaxAllocatableCapabilty();
+
+  @Public
+  @Unstable
+  public abstract void setIncrAllocatableCapabilty(Resource maxResource);
+
+  @Public
+  @Unstable
+  public abstract Resource getIncrAllocatableCapabilty();
+
+  @Public
+  @Unstable
+  public abstract void setContainerTokenExpiryInterval(int interval);
+
+  @Public
+  @Unstable
+  public abstract int getContainerTokenExpiryInterval();
+
+  @Public
+  @Unstable
+  public abstract void setContainerIdStart(long containerIdStart);
+
+  @Public
+  @Unstable
+  public abstract long getContainerIdStart();
+
+  @Public
+  @Unstable
+  public abstract void setNodesForScheduling(List<NodeId> nodesForScheduling);
+
+  @Public
+  @Unstable
+  public abstract List<NodeId> getNodesForScheduling();
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateResponsePBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateResponsePBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateResponsePBImpl.java
new file mode 100644
index 0000000..3ea4965
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateResponsePBImpl.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+    .AllocateResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos;
+import org.apache.hadoop.yarn.server.api.protocolrecords
+    .DistSchedAllocateResponse;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class DistSchedAllocateResponsePBImpl extends DistSchedAllocateResponse 
{
+
+  YarnServerCommonServiceProtos.DistSchedAllocateResponseProto proto =
+      
YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.getDefaultInstance();
+  YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.Builder builder 
= null;
+  boolean viaProto = false;
+
+  private AllocateResponse allocateResponse;
+  private List<NodeId> nodesForScheduling;
+
+  public DistSchedAllocateResponsePBImpl() {
+    builder = 
YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.newBuilder();
+  }
+
+  public 
DistSchedAllocateResponsePBImpl(YarnServerCommonServiceProtos.DistSchedAllocateResponseProto
 proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public YarnServerCommonServiceProtos.DistSchedAllocateResponseProto 
getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = 
YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  private synchronized void mergeLocalToProto() {
+    if (viaProto)
+      maybeInitBuilder();
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private synchronized void mergeLocalToBuilder() {
+    if (this.nodesForScheduling != null) {
+      builder.clearNodesForScheduling();
+      Iterable<YarnProtos.NodeIdProto> iterable =
+          getNodeIdProtoIterable(this.nodesForScheduling);
+      builder.addAllNodesForScheduling(iterable);
+    }
+    if (this.allocateResponse != null) {
+      builder.setAllocateResponse(
+          ((AllocateResponsePBImpl)this.allocateResponse).getProto());
+    }
+  }
+  @Override
+  public void setAllocateResponse(AllocateResponse response) {
+    maybeInitBuilder();
+    if(allocateResponse == null) {
+      builder.clearAllocateResponse();
+    }
+    this.allocateResponse = response;
+  }
+
+  @Override
+  public AllocateResponse getAllocateResponse() {
+    if (this.allocateResponse != null) {
+      return this.allocateResponse;
+    }
+
+    YarnServerCommonServiceProtos.DistSchedAllocateResponseProtoOrBuilder p =
+        viaProto ? proto : builder;
+    if (!p.hasAllocateResponse()) {
+      return null;
+    }
+
+    this.allocateResponse =
+        new AllocateResponsePBImpl(p.getAllocateResponse());
+    return this.allocateResponse;
+  }
+
+  @Override
+  public void setNodesForScheduling(List<NodeId> nodesForScheduling) {
+    maybeInitBuilder();
+    if (nodesForScheduling == null || nodesForScheduling.isEmpty()) {
+      if (this.nodesForScheduling != null) {
+        this.nodesForScheduling.clear();
+      }
+      builder.clearNodesForScheduling();
+      return;
+    }
+    this.nodesForScheduling = new ArrayList<>();
+    this.nodesForScheduling.addAll(nodesForScheduling);
+  }
+
+  @Override
+  public List<NodeId> getNodesForScheduling() {
+    if (nodesForScheduling != null) {
+      return nodesForScheduling;
+    }
+    initLocalNodesForSchedulingList();
+    return nodesForScheduling;
+  }
+
+  private synchronized void initLocalNodesForSchedulingList() {
+    YarnServerCommonServiceProtos.DistSchedAllocateResponseProtoOrBuilder p =
+        viaProto ? proto : builder;
+    List<YarnProtos.NodeIdProto> list = p.getNodesForSchedulingList();
+    nodesForScheduling = new ArrayList<>();
+    if (list != null) {
+      for (YarnProtos.NodeIdProto t : list) {
+        nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t));
+      }
+    }
+  }
+  private synchronized Iterable<YarnProtos.NodeIdProto> getNodeIdProtoIterable(
+      final List<NodeId> nodeList) {
+    maybeInitBuilder();
+    return new Iterable<YarnProtos.NodeIdProto>() {
+      @Override
+      public synchronized Iterator<YarnProtos.NodeIdProto> iterator() {
+        return new Iterator<YarnProtos.NodeIdProto>() {
+
+          Iterator<NodeId> iter = nodeList.iterator();
+
+          @Override
+          public boolean hasNext() {
+            return iter.hasNext();
+          }
+
+          @Override
+          public YarnProtos.NodeIdProto next() {
+            return ProtoUtils.convertToProtoFormat(iter.next());
+          }
+
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException();
+          }
+        };
+      }
+    };
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedRegisterResponsePBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedRegisterResponsePBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedRegisterResponsePBImpl.java
new file mode 100644
index 0000000..0322c70
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedRegisterResponsePBImpl.java
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import com.google.protobuf.TextFormat;
+
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+    .RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
+import org.apache.hadoop.yarn.server.api.protocolrecords
+    .DistSchedRegisterResponse;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class DistSchedRegisterResponsePBImpl extends DistSchedRegisterResponse 
{
+
+  YarnServerCommonServiceProtos.DistSchedRegisterResponseProto proto =
+      
YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.getDefaultInstance();
+  YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.Builder builder 
= null;
+  boolean viaProto = false;
+
+  private Resource maxAllocatableCapability;
+  private Resource minAllocatableCapability;
+  private Resource incrAllocatableCapability;
+  private List<NodeId> nodesForScheduling;
+  private RegisterApplicationMasterResponse registerApplicationMasterResponse;
+
+  public DistSchedRegisterResponsePBImpl() {
+    builder = 
YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.newBuilder();
+  }
+
+  public 
DistSchedRegisterResponsePBImpl(YarnServerCommonServiceProtos.DistSchedRegisterResponseProto
 proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public YarnServerCommonServiceProtos.DistSchedRegisterResponseProto 
getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = 
YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  private synchronized void mergeLocalToProto() {
+    if (viaProto)
+      maybeInitBuilder();
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private synchronized void mergeLocalToBuilder() {
+    if (this.nodesForScheduling != null) {
+      builder.clearNodesForScheduling();
+      Iterable<YarnProtos.NodeIdProto> iterable =
+          getNodeIdProtoIterable(this.nodesForScheduling);
+      builder.addAllNodesForScheduling(iterable);
+    }
+    if (this.maxAllocatableCapability != null) {
+      builder.setMaxAllocCapability(
+          ProtoUtils.convertToProtoFormat(this.maxAllocatableCapability));
+    }
+    if (this.minAllocatableCapability != null) {
+      builder.setMaxAllocCapability(
+          ProtoUtils.convertToProtoFormat(this.minAllocatableCapability));
+    }
+    if (this.registerApplicationMasterResponse != null) {
+      builder.setRegisterResponse(
+          ((RegisterApplicationMasterResponsePBImpl)
+              this.registerApplicationMasterResponse).getProto());
+    }
+  }
+
+  @Override
+  public void setRegisterResponse(RegisterApplicationMasterResponse resp) {
+    maybeInitBuilder();
+    if(registerApplicationMasterResponse == null) {
+      builder.clearRegisterResponse();
+    }
+    this.registerApplicationMasterResponse = resp;
+  }
+
+  @Override
+  public RegisterApplicationMasterResponse getRegisterResponse() {
+    if (this.registerApplicationMasterResponse != null) {
+      return this.registerApplicationMasterResponse;
+    }
+
+    YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = 
viaProto ? proto : builder;
+    if (!p.hasRegisterResponse()) {
+      return null;
+    }
+
+    this.registerApplicationMasterResponse =
+        new RegisterApplicationMasterResponsePBImpl(p.getRegisterResponse());
+    return this.registerApplicationMasterResponse;
+  }
+
+  @Override
+  public void setMaxAllocatableCapabilty(Resource maxResource) {
+    maybeInitBuilder();
+    if(maxAllocatableCapability == null) {
+      builder.clearMaxAllocCapability();
+    }
+    this.maxAllocatableCapability = maxResource;
+  }
+
+  @Override
+  public Resource getMaxAllocatableCapabilty() {
+    if (this.maxAllocatableCapability != null) {
+      return this.maxAllocatableCapability;
+    }
+
+    YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = 
viaProto ? proto : builder;
+    if (!p.hasMaxAllocCapability()) {
+      return null;
+    }
+
+    this.maxAllocatableCapability =
+        ProtoUtils.convertFromProtoFormat(p.getMaxAllocCapability());
+    return this.maxAllocatableCapability;
+  }
+
+  @Override
+  public void setMinAllocatableCapabilty(Resource minResource) {
+    maybeInitBuilder();
+    if(minAllocatableCapability == null) {
+      builder.clearMinAllocCapability();
+    }
+    this.minAllocatableCapability = minResource;
+  }
+
+  @Override
+  public Resource getMinAllocatableCapabilty() {
+    if (this.minAllocatableCapability != null) {
+      return this.minAllocatableCapability;
+    }
+
+    YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = 
viaProto ? proto : builder;
+    if (!p.hasMinAllocCapability()) {
+      return null;
+    }
+
+    this.minAllocatableCapability =
+        ProtoUtils.convertFromProtoFormat(p.getMinAllocCapability());
+    return this.minAllocatableCapability;
+  }
+
+  @Override
+  public void setIncrAllocatableCapabilty(Resource incrResource) {
+    maybeInitBuilder();
+    if(incrAllocatableCapability == null) {
+      builder.clearIncrAllocCapability();
+    }
+    this.incrAllocatableCapability = incrResource;
+  }
+
+  @Override
+  public Resource getIncrAllocatableCapabilty() {
+    if (this.incrAllocatableCapability != null) {
+      return this.incrAllocatableCapability;
+    }
+
+    YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = 
viaProto ? proto : builder;
+    if (!p.hasIncrAllocCapability()) {
+      return null;
+    }
+
+    this.incrAllocatableCapability =
+        ProtoUtils.convertFromProtoFormat(p.getIncrAllocCapability());
+    return this.incrAllocatableCapability;
+  }
+
+  @Override
+  public void setContainerTokenExpiryInterval(int interval) {
+    maybeInitBuilder();
+    builder.setContainerTokenExpiryInterval(interval);
+  }
+
+  @Override
+  public int getContainerTokenExpiryInterval() {
+    YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = 
viaProto ? proto : builder;
+    if (!p.hasContainerTokenExpiryInterval()) {
+      return 0;
+    }
+    return p.getContainerTokenExpiryInterval();
+  }
+
+  @Override
+  public void setContainerIdStart(long containerIdStart) {
+    maybeInitBuilder();
+    builder.setContainerIdStart(containerIdStart);
+  }
+
+  @Override
+  public long getContainerIdStart() {
+    YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = 
viaProto ? proto : builder;
+    if (!p.hasContainerIdStart()) {
+      return 0;
+    }
+    return p.getContainerIdStart();
+  }
+
+
+  @Override
+  public void setNodesForScheduling(List<NodeId> nodesForScheduling) {
+    maybeInitBuilder();
+    if (nodesForScheduling == null || nodesForScheduling.isEmpty()) {
+      if (this.nodesForScheduling != null) {
+        this.nodesForScheduling.clear();
+      }
+      builder.clearNodesForScheduling();
+      return;
+    }
+    this.nodesForScheduling = new ArrayList<>();
+    this.nodesForScheduling.addAll(nodesForScheduling);
+  }
+
+  @Override
+  public List<NodeId> getNodesForScheduling() {
+    if (nodesForScheduling != null) {
+      return nodesForScheduling;
+    }
+    initLocalNodesForSchedulingList();
+    return nodesForScheduling;
+  }
+
+  private synchronized void initLocalNodesForSchedulingList() {
+    YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = 
viaProto ? proto : builder;
+    List<YarnProtos.NodeIdProto> list = p.getNodesForSchedulingList();
+    nodesForScheduling = new ArrayList<>();
+    if (list != null) {
+      for (YarnProtos.NodeIdProto t : list) {
+        nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t));
+      }
+    }
+  }
+  private synchronized Iterable<YarnProtos.NodeIdProto> getNodeIdProtoIterable(
+      final List<NodeId> nodeList) {
+    maybeInitBuilder();
+    return new Iterable<YarnProtos.NodeIdProto>() {
+      @Override
+      public synchronized Iterator<YarnProtos.NodeIdProto> iterator() {
+        return new Iterator<YarnProtos.NodeIdProto>() {
+
+          Iterator<NodeId> iter = nodeList.iterator();
+
+          @Override
+          public boolean hasNext() {
+            return iter.hasNext();
+          }
+
+          @Override
+          public YarnProtos.NodeIdProto next() {
+            return ProtoUtils.convertToProtoFormat(iter.next());
+          }
+
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException();
+          }
+        };
+      }
+    };
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
new file mode 100644
index 0000000..7e3a77f
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * These .proto interfaces are public and stable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *stable* .proto interface.
+ */
+
+option java_package = "org.apache.hadoop.yarn.proto";
+option java_outer_classname = "DistributedSchedulerProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.yarn;
+
+import "yarn_service_protos.proto";
+import "yarn_server_common_service_protos.proto";
+
+
+service DistributedSchedulerProtocolService {
+  rpc registerApplicationMasterForDistributedScheduling 
(RegisterApplicationMasterRequestProto) returns 
(DistSchedRegisterResponseProto);
+  rpc allocateForDistributedScheduling (AllocateRequestProto) returns 
(DistSchedAllocateResponseProto);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index a54bbdb..786d8ee 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -26,6 +26,21 @@ import "yarn_protos.proto";
 import "yarn_server_common_protos.proto";
 import "yarn_service_protos.proto";
 
+message DistSchedRegisterResponseProto {
+  optional RegisterApplicationMasterResponseProto register_response = 1;
+  optional ResourceProto max_alloc_capability = 2;
+  optional ResourceProto min_alloc_capability = 3;
+  optional ResourceProto incr_alloc_capability = 4;
+  optional int32 container_token_expiry_interval = 5;
+  optional int64 container_id_start = 6;
+  repeated NodeIdProto nodes_for_scheduling = 7;
+}
+
+message DistSchedAllocateResponseProto {
+  optional AllocateResponseProto allocate_response = 1;
+  repeated NodeIdProto nodes_for_scheduling = 2;
+}
+
 message NodeLabelsProto {
   repeated NodeLabelProto nodeLabels = 1;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index 9c2d1fb..e0a4da4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -32,6 +32,7 @@ import 
org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import 
org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator;
 import 
org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import 
org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -87,4 +88,8 @@ public interface Context {
 
   ConcurrentLinkedQueue<LogAggregationReport>
       getLogAggregationStatusForApps();
+
+  boolean isDistributedSchedulingEnabled();
+
+  OpportunisticContainerAllocator getContainerAllocator();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index a9a5411..ef7b760 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -69,6 +69,7 @@ import 
org.apache.hadoop.yarn.server.nodemanager.nodelabels.ScriptBasedNodeLabel
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import 
org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator;
 import 
org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import 
org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer;
@@ -187,9 +188,9 @@ public class NodeManager extends CompositeService
   protected NMContext createNMContext(
       NMContainerTokenSecretManager containerTokenSecretManager,
       NMTokenSecretManagerInNM nmTokenSecretManager,
-      NMStateStoreService stateStore) {
+      NMStateStoreService stateStore, boolean isDistSchedulerEnabled) {
     return new NMContext(containerTokenSecretManager, nmTokenSecretManager,
-        dirsHandler, aclsManager, stateStore);
+        dirsHandler, aclsManager, stateStore, isDistSchedulerEnabled);
   }
 
   protected void doSecureLogin() throws IOException {
@@ -310,8 +311,12 @@ public class NodeManager extends CompositeService
             getNodeHealthScriptRunner(conf), dirsHandler);
     addService(nodeHealthChecker);
 
+    boolean isDistSchedulingEnabled =
+        conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED,
+            YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT);
+
     this.context = createNMContext(containerTokenSecretManager,
-        nmTokenSecretManager, nmStore);
+        nmTokenSecretManager, nmStore, isDistSchedulingEnabled);
 
     nodeLabelsProvider = createNodeLabelsProvider(conf);
 
@@ -340,6 +345,10 @@ public class NodeManager extends CompositeService
     addService(webServer);
     ((NMContext) context).setWebServer(webServer);
 
+    ((NMContext) context).setQueueableContainerAllocator(
+        new OpportunisticContainerAllocator(nodeStatusUpdater, context,
+            webServer.getPort()));
+
     dispatcher.register(ContainerManagerEventType.class, containerManager);
     dispatcher.register(NodeManagerEventType.class, this);
     addService(dispatcher);
@@ -458,11 +467,14 @@ public class NodeManager extends CompositeService
     private boolean isDecommissioned = false;
     private final ConcurrentLinkedQueue<LogAggregationReport>
         logAggregationReportForApps;
+    private final boolean isDistSchedulingEnabled;
+
+    private OpportunisticContainerAllocator containerAllocator;
 
     public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
         NMTokenSecretManagerInNM nmTokenSecretManager,
         LocalDirsHandlerService dirsHandler, ApplicationACLsManager 
aclsManager,
-        NMStateStoreService stateStore) {
+        NMStateStoreService stateStore, boolean isDistSchedulingEnabled) {
       this.containerTokenSecretManager = containerTokenSecretManager;
       this.nmTokenSecretManager = nmTokenSecretManager;
       this.dirsHandler = dirsHandler;
@@ -473,6 +485,7 @@ public class NodeManager extends CompositeService
       this.stateStore = stateStore;
       this.logAggregationReportForApps = new ConcurrentLinkedQueue<
           LogAggregationReport>();
+      this.isDistSchedulingEnabled = isDistSchedulingEnabled;
     }
 
     /**
@@ -585,6 +598,21 @@ public class NodeManager extends CompositeService
         getLogAggregationStatusForApps() {
       return this.logAggregationReportForApps;
     }
+
+    @Override
+    public boolean isDistributedSchedulingEnabled() {
+      return isDistSchedulingEnabled;
+    }
+
+    public void setQueueableContainerAllocator(
+        OpportunisticContainerAllocator containerAllocator) {
+      this.containerAllocator = containerAllocator;
+    }
+
+    @Override
+    public OpportunisticContainerAllocator getContainerAllocator() {
+      return containerAllocator;
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
index bd6538c..67bb52b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
@@ -63,6 +63,8 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+
+import org.apache.hadoop.yarn.server.nodemanager.scheduler.LocalScheduler;
 import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
@@ -464,6 +466,12 @@ public class AMRMProxyService extends AbstractService 
implements
       interceptorClassNames.add(item.trim());
     }
 
+    // Make sure LocalScheduler is present at the beginning
+    // of the chain..
+    if (this.nmContext.isDistributedSchedulingEnabled()) {
+      interceptorClassNames.add(0, LocalScheduler.class.getName());
+    }
+
     return interceptorClassNames;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
index 810dfa8..2b2a2f6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
@@ -21,6 +21,14 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
 import org.apache.hadoop.conf.Configuration;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords
+    .RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+
+import java.io.IOException;
 
 /**
  * Implements the RequestInterceptor interface and provides common 
functionality
@@ -99,4 +107,38 @@ public abstract class AbstractRequestInterceptor implements
   public AMRMProxyApplicationContext getApplicationContext() {
     return this.appContext;
   }
+
+  /**
+   * Default implementation that invokes the distributed scheduling version
+   * of the register method.
+   *
+   * @param request ApplicationMaster allocate request
+   * @return Distribtued Scheduler Allocate Response
+   * @throws YarnException
+   * @throws IOException
+   */
+  @Override
+  public DistSchedAllocateResponse allocateForDistributedScheduling
+      (AllocateRequest request) throws YarnException, IOException {
+    return (this.nextInterceptor != null) ?
+        this.nextInterceptor.allocateForDistributedScheduling(request) : null;
+  }
+
+  /**
+   * Default implementation that invokes the distributed scheduling version
+   * of the allocate method.
+   *
+   * @param request ApplicationMaster registration request
+   * @return Distributed Scheduler Register Response
+   * @throws YarnException
+   * @throws IOException
+   */
+  @Override
+  public DistSchedRegisterResponse
+  registerApplicationMasterForDistributedScheduling
+      (RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return (this.nextInterceptor != null) ? this.nextInterceptor
+        .registerApplicationMasterForDistributedScheduling(request) : null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
index 2c7939b..5e10d03 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
@@ -20,10 +20,15 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 
+import com.google.common.base.Joiner;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -33,9 +38,16 @@ import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque
 import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.server.api.ServerRMProxy;
+import org.apache.hadoop.yarn.server.api.protocolrecords
+    .DistSchedAllocateResponse;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,7 +60,7 @@ public final class DefaultRequestInterceptor extends
     AbstractRequestInterceptor {
   private static final Logger LOG = LoggerFactory
       .getLogger(DefaultRequestInterceptor.class);
-  private ApplicationMasterProtocol rmClient;
+  private DistributedSchedulerProtocol rmClient;
   private UserGroupInformation user = null;
 
   @Override
@@ -63,11 +75,12 @@ public final class DefaultRequestInterceptor extends
       final Configuration conf = this.getConf();
 
       rmClient =
-          user.doAs(new PrivilegedExceptionAction<ApplicationMasterProtocol>() 
{
+          user.doAs(new 
PrivilegedExceptionAction<DistributedSchedulerProtocol>() {
             @Override
-            public ApplicationMasterProtocol run() throws Exception {
-              return ClientRMProxy.createRMProxy(conf,
-                  ApplicationMasterProtocol.class);
+            public DistributedSchedulerProtocol run() throws Exception {
+              setAMRMTokenService(conf);
+              return ServerRMProxy.createRMProxy(conf,
+                  DistributedSchedulerProtocol.class);
             }
           });
     } catch (IOException e) {
@@ -108,6 +121,32 @@ public final class DefaultRequestInterceptor extends
   }
 
   @Override
+  public DistSchedRegisterResponse
+  registerApplicationMasterForDistributedScheduling
+      (RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    LOG.info("Forwarding registerApplicationMasterForDistributedScheduling" +
+        "request to the real YARN RM");
+    return rmClient.registerApplicationMasterForDistributedScheduling(request);
+  }
+
+  @Override
+  public DistSchedAllocateResponse allocateForDistributedScheduling
+      (AllocateRequest request) throws YarnException, IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Forwarding allocateForDistributedScheduling request" +
+          "to the real YARN RM");
+    }
+    DistSchedAllocateResponse allocateResponse =
+        rmClient.allocateForDistributedScheduling(request);
+    if (allocateResponse.getAllocateResponse().getAMRMToken() != null) {
+      updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken());
+    }
+
+    return allocateResponse;
+  }
+
+  @Override
   public FinishApplicationMasterResponse finishApplicationMaster(
       final FinishApplicationMasterRequest request) throws YarnException,
       IOException {
@@ -135,4 +174,43 @@ public final class DefaultRequestInterceptor extends
     user.addToken(amrmToken);
     amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConf()));
   }
+
+  private static void setAMRMTokenService(final Configuration conf)
+      throws IOException {
+    for (org.apache.hadoop.security.token.Token<? extends TokenIdentifier> 
token : UserGroupInformation
+        .getCurrentUser().getTokens()) {
+      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+        token.setService(getAMRMTokenService(conf));
+      }
+    }
+  }
+
+  @InterfaceStability.Unstable
+  public static Text getAMRMTokenService(Configuration conf) {
+    return getTokenService(conf, YarnConfiguration.RM_SCHEDULER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+  }
+
+  @InterfaceStability.Unstable
+  public static Text getTokenService(Configuration conf, String address,
+      String defaultAddr, int defaultPort) {
+    if (HAUtil.isHAEnabled(conf)) {
+      // Build a list of service addresses to form the service name
+      ArrayList<String> services = new ArrayList<String>();
+      YarnConfiguration yarnConf = new YarnConfiguration(conf);
+      for (String rmId : HAUtil.getRMHAIds(conf)) {
+        // Set RM_ID to get the corresponding RM_ADDRESS
+        yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
+        services.add(SecurityUtil.buildTokenService(
+            yarnConf.getSocketAddr(address, defaultAddr, defaultPort))
+            .toString());
+      }
+      return new Text(Joiner.on(',').join(services));
+    }
+
+    // Non-HA case - no need to set RM_ID
+    return SecurityUtil.buildTokenService(conf.getSocketAddr(address,
+        defaultAddr, defaultPort));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
index c74c88f..7a73563 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
@@ -19,14 +19,14 @@
 package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
 
 import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
 
 /**
  * Defines the contract to be implemented by the request intercepter classes,
  * that can be used to intercept and inspect messages sent from the application
  * master to the resource manager.
  */
-public interface RequestInterceptor extends ApplicationMasterProtocol,
+public interface RequestInterceptor extends DistributedSchedulerProtocol,
     Configurable {
   /**
    * This method is called for initializing the intercepter. This is guaranteed

Reply via email to