YARN-2885. Create AMRMProxy request interceptor and ContainerAllocator to distribute OPPORTUNISTIC containers to appropriate Nodes (asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b00875e6 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b00875e6 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b00875e6 Branch: refs/heads/yarn-2877 Commit: b00875e602c6f2b7e03ff613552936ef749f5012 Parents: fd6a6da Author: Arun Suresh <asur...@apache.org> Authored: Thu Feb 11 08:57:58 2016 -0800 Committer: Arun Suresh <asur...@apache.org> Committed: Thu Feb 11 09:04:25 2016 -0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES-yarn-2877.txt | 3 + .../hadoop/yarn/conf/YarnConfiguration.java | 42 ++ .../yarn/api/records/impl/pb/ProtoUtils.java | 15 + .../hadoop-yarn-server-common/pom.xml | 1 + .../api/DistributedSchedulerProtocol.java | 78 ++++ .../api/DistributedSchedulerProtocolPB.java | 36 ++ .../hadoop/yarn/server/api/ServerRMProxy.java | 4 + ...istributedSchedulerProtocolPBClientImpl.java | 151 +++++++ ...stributedSchedulerProtocolPBServiceImpl.java | 143 ++++++ .../DistSchedAllocateResponse.java | 58 +++ .../DistSchedRegisterResponse.java | 102 +++++ .../pb/DistSchedAllocateResponsePBImpl.java | 180 ++++++++ .../pb/DistSchedRegisterResponsePBImpl.java | 304 +++++++++++++ .../proto/distributed_scheduler_protocol.proto | 38 ++ .../yarn_server_common_service_protos.proto | 15 + .../hadoop/yarn/server/nodemanager/Context.java | 5 + .../yarn/server/nodemanager/NodeManager.java | 36 +- .../nodemanager/amrmproxy/AMRMProxyService.java | 8 + .../amrmproxy/AbstractRequestInterceptor.java | 42 ++ .../amrmproxy/DefaultRequestInterceptor.java | 88 +++- .../amrmproxy/RequestInterceptor.java | 4 +- .../nodemanager/scheduler/LocalScheduler.java | 437 +++++++++++++++++++ .../OpportunisticContainerAllocator.java | 185 ++++++++ .../security/NMTokenSecretManagerInNM.java | 24 +- .../yarn/server/nodemanager/TestEventFlow.java | 2 +- .../nodemanager/TestNodeStatusUpdater.java | 4 +- .../amrmproxy/BaseAMRMProxyTest.java | 10 + .../BaseContainerManagerTest.java | 2 +- .../TestContainerManagerRecovery.java | 2 +- .../launcher/TestContainerLaunch.java | 2 +- .../TestLocalCacheDirectoryManager.java | 3 +- .../TestResourceLocalizationService.java | 4 +- .../scheduler/TestLocalScheduler.java | 212 +++++++++ .../webapp/TestContainerLogsPage.java | 6 +- .../nodemanager/webapp/TestNMWebServer.java | 4 +- .../nodemanager/webapp/TestNMWebServices.java | 2 +- .../webapp/TestNMWebServicesApps.java | 2 +- .../webapp/TestNMWebServicesContainers.java | 2 +- .../ApplicationMasterService.java | 39 +- .../DistributedSchedulingService.java | 162 +++++++ .../server/resourcemanager/ResourceManager.java | 10 + .../scheduler/AppSchedulingInfo.java | 3 +- .../yarn/server/resourcemanager/MockRM.java | 17 + .../TestDistributedSchedulingService.java | 170 ++++++++ 44 files changed, 2615 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/CHANGES-yarn-2877.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES-yarn-2877.txt b/hadoop-yarn-project/CHANGES-yarn-2877.txt index d29ff0f..a147866 100644 --- a/hadoop-yarn-project/CHANGES-yarn-2877.txt +++ b/hadoop-yarn-project/CHANGES-yarn-2877.txt @@ -13,3 +13,6 @@ yarn-2877 distributed scheduling (Unreleased) YARN-4335. Allow ResourceRequests to specify ExecutionType of a request ask (kkaranasos via asuresh) + YARN-2885. Create AMRMProxy request interceptor and ContainerAllocator + to distribute OPPORTUNISTIC containers to appropriate Nodes (asuresh) + http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index d84c155..edae3eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -291,6 +291,48 @@ public class YarnConfiguration extends Configuration { /** ACL used in case none is found. Allows nothing. */ public static final String DEFAULT_YARN_APP_ACL = " "; + /** Is Distributed Scheduling Enabled. */ + public static String DIST_SCHEDULING_ENABLED = + YARN_PREFIX + "distributed-scheduling.enabled"; + public static final boolean DIST_SCHEDULING_ENABLED_DEFAULT = false; + + /** Mininum allocatable container memory for Distributed Scheduling. */ + public static String DIST_SCHEDULING_MIN_MEMORY = + YARN_PREFIX + "distributed-scheduling.min-memory"; + public static final int DIST_SCHEDULING_MIN_MEMORY_DEFAULT = 512; + + /** Mininum allocatable container vcores for Distributed Scheduling. */ + public static String DIST_SCHEDULING_MIN_VCORES = + YARN_PREFIX + "distributed-scheduling.min-vcores"; + public static final int DIST_SCHEDULING_MIN_VCORES_DEFAULT = 1; + + /** Maximum allocatable container memory for Distributed Scheduling. */ + public static String DIST_SCHEDULING_MAX_MEMORY = + YARN_PREFIX + "distributed-scheduling.max-memory"; + public static final int DIST_SCHEDULING_MAX_MEMORY_DEFAULT = 2048; + + /** Maximum allocatable container vcores for Distributed Scheduling. */ + public static String DIST_SCHEDULING_MAX_VCORES = + YARN_PREFIX + "distributed-scheduling.max-vcores"; + public static final int DIST_SCHEDULING_MAX_VCORES_DEFAULT = 4; + + /** Incremental allocatable container memory for Distributed Scheduling. */ + public static String DIST_SCHEDULING_INCR_MEMORY = + YARN_PREFIX + "distributed-scheduling.incr-memory"; + public static final int DIST_SCHEDULING_INCR_MEMORY_DEFAULT = 512; + + /** Incremental allocatable container vcores for Distributed Scheduling. */ + public static String DIST_SCHEDULING_INCR_VCORES = + YARN_PREFIX + "distributed-scheduling.incr-vcores"; + public static final int DIST_SCHEDULING_INCR_VCORES_DEFAULT = 1; + + /** Container token expiry for container allocated via Distributed + * Scheduling. */ + public static String DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS = + YARN_PREFIX + "distributed-scheduling.container-token-expiry"; + public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT = + 600000; + /** * Enable/disable intermediate-data encryption at YARN level. For now, this * only is used by the FileSystemRMStateStore to setup right file-system http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index 29ed0f3..9d683f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -37,8 +37,10 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnProtos.AMCommandProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAccessTypeProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto; @@ -294,4 +296,17 @@ public class ProtoUtils { public static ExecutionType convertFromProtoFormat(ExecutionTypeProto e) { return ExecutionType.valueOf(e.name()); } + + /* + * Resource + */ + public static synchronized YarnProtos.ResourceProto convertToProtoFormat( + Resource r) { + return ((ResourcePBImpl) r).getProto(); + } + + public static Resource convertFromProtoFormat( + YarnProtos.ResourceProto resource) { + return new ResourcePBImpl(resource); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index 2958b81..3a6bddd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -142,6 +142,7 @@ <source> <directory>${basedir}/src/main/proto</directory> <includes> + <include>distributed_scheduler_protocol.proto</include> <include>yarn_server_common_protos.proto</include> <include>yarn_server_common_service_protos.proto</include> <include>yarn_server_common_service_protos.proto</include> http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java new file mode 100644 index 0000000..490c25b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.io.retry.Idempotent; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; + +import java.io.IOException; + +/** + * <p>This protocol extends the <code>ApplicationMasterProtocol</code>. It is + * used by the <code>LocalScheduler</code> running on the NodeManager to wrap + * the request / response objects of the <code>registerApplicationMaster</code> + * and <code>allocate</code> methods of the protocol with addition information + * required to perform Distributed Scheduling. + * </p> + */ +public interface DistributedSchedulerProtocol + extends ApplicationMasterProtocol { + + /** + * <p> Extends the <code>registerApplicationMaster</code> to wrap the response + * with additional metadata.</p> + * + * @param request ApplicationMaster registration request + * @return A <code>DistSchedRegisterResponse</code> that contains a standard + * AM registration response along with additional information required + * for Distributed Scheduling + * @throws YarnException + * @throws IOException + */ + @Public + @Unstable + @Idempotent + DistSchedRegisterResponse registerApplicationMasterForDistributedScheduling( + RegisterApplicationMasterRequest request) + throws YarnException, IOException; + + /** + * <p> Extends the <code>allocate</code> to wrap the response with additional + * metadata.</p> + * + * @param request ApplicationMaster allocate request + * @return A <code>DistSchedAllocateResponse</code> that contains a standard + * AM allocate response along with additional information required + * for Distributed Scheduling + * @throws YarnException + * @throws IOException + */ + @Public + @Unstable + @Idempotent + DistSchedAllocateResponse allocateForDistributedScheduling( + AllocateRequest request) throws YarnException, IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocolPB.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocolPB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocolPB.java new file mode 100644 index 0000000..413b9c9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocolPB.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService; + + +import org.apache.hadoop.yarn.proto.DistributedSchedulerProtocol; + +@Private +@Unstable +@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB", + protocolVersion = 1) +public interface DistributedSchedulerProtocolPB extends + DistributedSchedulerProtocol.DistributedSchedulerProtocolService.BlockingInterface, + ApplicationMasterProtocolService.BlockingInterface { +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java index 2d4085f..c23e27c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java @@ -78,6 +78,10 @@ public class ServerRMProxy<T> extends RMProxy<T> { YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT); + } else if (protocol == DistributedSchedulerProtocol.class) { + return conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); } else { String message = "Unsupported protocol found when creating the proxy " + "connection to ResourceManager: " + http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java new file mode 100644 index 0000000..c1dd9e5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.impl.pb.client; + +import com.google.protobuf.ServiceException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl; + + +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb + .FinishApplicationMasterRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb + .FinishApplicationMasterResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb + .RegisterApplicationMasterRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb + .RegisterApplicationMasterResponsePBImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.proto.YarnServiceProtos; +import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; + +public class DistributedSchedulerProtocolPBClientImpl implements + DistributedSchedulerProtocol, Closeable { + + private DistributedSchedulerProtocolPB proxy; + + public DistributedSchedulerProtocolPBClientImpl(long clientVersion, + InetSocketAddress addr, + Configuration conf) throws IOException { + RPC.setProtocolEngine(conf, DistributedSchedulerProtocolPB.class, + ProtobufRpcEngine.class); + proxy = RPC.getProxy(DistributedSchedulerProtocolPB.class, clientVersion, + addr, conf); + } + + @Override + public void close() { + if (this.proxy != null) { + RPC.stopProxy(this.proxy); + } + } + + @Override + public DistSchedRegisterResponse + registerApplicationMasterForDistributedScheduling + (RegisterApplicationMasterRequest request) throws YarnException, + IOException { + YarnServiceProtos.RegisterApplicationMasterRequestProto requestProto = + ((RegisterApplicationMasterRequestPBImpl) request).getProto(); + try { + return new DistSchedRegisterResponsePBImpl( + proxy.registerApplicationMasterForDistributedScheduling( + null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public DistSchedAllocateResponse allocateForDistributedScheduling + (AllocateRequest request) throws YarnException, IOException { + YarnServiceProtos.AllocateRequestProto requestProto = + ((AllocateRequestPBImpl) request).getProto(); + try { + return new DistSchedAllocateResponsePBImpl( + proxy.allocateForDistributedScheduling(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster + (RegisterApplicationMasterRequest request) throws YarnException, + IOException { + YarnServiceProtos.RegisterApplicationMasterRequestProto requestProto = + ((RegisterApplicationMasterRequestPBImpl) request).getProto(); + try { + return new RegisterApplicationMasterResponsePBImpl( + proxy.registerApplicationMaster(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster + (FinishApplicationMasterRequest request) throws YarnException, + IOException { + YarnServiceProtos.FinishApplicationMasterRequestProto requestProto = + ((FinishApplicationMasterRequestPBImpl) request).getProto(); + try { + return new FinishApplicationMasterResponsePBImpl( + proxy.finishApplicationMaster(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public AllocateResponse allocate(AllocateRequest request) throws + YarnException, IOException { + YarnServiceProtos.AllocateRequestProto requestProto = + ((AllocateRequestPBImpl) request).getProto(); + try { + return new AllocateResponsePBImpl(proxy.allocate(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java new file mode 100644 index 0000000..8be2893 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.impl.pb.service; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos; +import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; +import org.apache.hadoop.yarn.api.protocolrecords + .FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb + .FinishApplicationMasterRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb + .FinishApplicationMasterResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServiceProtos; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto; + +import java.io.IOException; + +public class DistributedSchedulerProtocolPBServiceImpl implements + DistributedSchedulerProtocolPB { + + private DistributedSchedulerProtocol real; + + public DistributedSchedulerProtocolPBServiceImpl( + DistributedSchedulerProtocol impl) { + this.real = impl; + } + + @Override + public YarnServerCommonServiceProtos.DistSchedRegisterResponseProto + registerApplicationMasterForDistributedScheduling(RpcController controller, + RegisterApplicationMasterRequestProto proto) throws + ServiceException { + RegisterApplicationMasterRequestPBImpl request = new + RegisterApplicationMasterRequestPBImpl(proto); + try { + DistSchedRegisterResponse response = + real.registerApplicationMasterForDistributedScheduling(request); + return ((DistSchedRegisterResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public YarnServerCommonServiceProtos.DistSchedAllocateResponseProto + allocateForDistributedScheduling(RpcController controller, + AllocateRequestProto proto) throws ServiceException { + AllocateRequestPBImpl request = new AllocateRequestPBImpl(proto); + try { + DistSchedAllocateResponse response = real + .allocateForDistributedScheduling(request); + return ((DistSchedAllocateResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public YarnServiceProtos.AllocateResponseProto allocate(RpcController arg0, + AllocateRequestProto proto) throws ServiceException { + AllocateRequestPBImpl request = new AllocateRequestPBImpl(proto); + try { + AllocateResponse response = real.allocate(request); + return ((AllocateResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public YarnServiceProtos.FinishApplicationMasterResponseProto + finishApplicationMaster( + RpcController arg0, YarnServiceProtos + .FinishApplicationMasterRequestProto proto) + throws ServiceException { + FinishApplicationMasterRequestPBImpl request = new + FinishApplicationMasterRequestPBImpl(proto); + try { + FinishApplicationMasterResponse response = real.finishApplicationMaster + (request); + return ((FinishApplicationMasterResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public YarnServiceProtos.RegisterApplicationMasterResponseProto + registerApplicationMaster( + RpcController arg0, RegisterApplicationMasterRequestProto proto) + throws ServiceException { + RegisterApplicationMasterRequestPBImpl request = new + RegisterApplicationMasterRequestPBImpl(proto); + try { + RegisterApplicationMasterResponse response = real + .registerApplicationMaster(request); + return ((RegisterApplicationMasterResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateResponse.java new file mode 100644 index 0000000..5f6e069 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateResponse.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.util.Records; + +import java.util.List; + +@Public +@Unstable +public abstract class DistSchedAllocateResponse { + + @Public + @Unstable + public static DistSchedAllocateResponse newInstance(AllocateResponse + allResp) { + DistSchedAllocateResponse response = + Records.newRecord(DistSchedAllocateResponse.class); + response.setAllocateResponse(allResp); + return response; + } + + @Public + @Unstable + public abstract void setAllocateResponse(AllocateResponse response); + + @Public + @Unstable + public abstract AllocateResponse getAllocateResponse(); + + @Public + @Unstable + public abstract void setNodesForScheduling(List<NodeId> nodesForScheduling); + + @Public + @Unstable + public abstract List<NodeId> getNodesForScheduling(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedRegisterResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedRegisterResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedRegisterResponse.java new file mode 100644 index 0000000..e4e5138 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedRegisterResponse.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords + .RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.Records; + +import java.util.List; + +@Public +@Unstable +public abstract class DistSchedRegisterResponse { + + @Public + @Unstable + public static DistSchedRegisterResponse newInstance + (RegisterApplicationMasterResponse regAMResp) { + DistSchedRegisterResponse response = + Records.newRecord(DistSchedRegisterResponse.class); + response.setRegisterResponse(regAMResp); + return response; + } + + @Public + @Unstable + public abstract void setRegisterResponse( + RegisterApplicationMasterResponse resp); + + @Public + @Unstable + public abstract RegisterApplicationMasterResponse getRegisterResponse(); + + @Public + @Unstable + public abstract void setMinAllocatableCapabilty(Resource minResource); + + @Public + @Unstable + public abstract Resource getMinAllocatableCapabilty(); + + @Public + @Unstable + public abstract void setMaxAllocatableCapabilty(Resource maxResource); + + @Public + @Unstable + public abstract Resource getMaxAllocatableCapabilty(); + + @Public + @Unstable + public abstract void setIncrAllocatableCapabilty(Resource maxResource); + + @Public + @Unstable + public abstract Resource getIncrAllocatableCapabilty(); + + @Public + @Unstable + public abstract void setContainerTokenExpiryInterval(int interval); + + @Public + @Unstable + public abstract int getContainerTokenExpiryInterval(); + + @Public + @Unstable + public abstract void setContainerIdStart(long containerIdStart); + + @Public + @Unstable + public abstract long getContainerIdStart(); + + @Public + @Unstable + public abstract void setNodesForScheduling(List<NodeId> nodesForScheduling); + + @Public + @Unstable + public abstract List<NodeId> getNodesForScheduling(); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateResponsePBImpl.java new file mode 100644 index 0000000..3ea4965 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateResponsePBImpl.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; + +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb + .AllocateResponsePBImpl; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.proto.YarnProtos; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos; +import org.apache.hadoop.yarn.proto.YarnServiceProtos; +import org.apache.hadoop.yarn.server.api.protocolrecords + .DistSchedAllocateResponse; + + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class DistSchedAllocateResponsePBImpl extends DistSchedAllocateResponse { + + YarnServerCommonServiceProtos.DistSchedAllocateResponseProto proto = + YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.getDefaultInstance(); + YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.Builder builder = null; + boolean viaProto = false; + + private AllocateResponse allocateResponse; + private List<NodeId> nodesForScheduling; + + public DistSchedAllocateResponsePBImpl() { + builder = YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.newBuilder(); + } + + public DistSchedAllocateResponsePBImpl(YarnServerCommonServiceProtos.DistSchedAllocateResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public YarnServerCommonServiceProtos.DistSchedAllocateResponseProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.newBuilder(proto); + } + viaProto = false; + } + + private synchronized void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private synchronized void mergeLocalToBuilder() { + if (this.nodesForScheduling != null) { + builder.clearNodesForScheduling(); + Iterable<YarnProtos.NodeIdProto> iterable = + getNodeIdProtoIterable(this.nodesForScheduling); + builder.addAllNodesForScheduling(iterable); + } + if (this.allocateResponse != null) { + builder.setAllocateResponse( + ((AllocateResponsePBImpl)this.allocateResponse).getProto()); + } + } + @Override + public void setAllocateResponse(AllocateResponse response) { + maybeInitBuilder(); + if(allocateResponse == null) { + builder.clearAllocateResponse(); + } + this.allocateResponse = response; + } + + @Override + public AllocateResponse getAllocateResponse() { + if (this.allocateResponse != null) { + return this.allocateResponse; + } + + YarnServerCommonServiceProtos.DistSchedAllocateResponseProtoOrBuilder p = + viaProto ? proto : builder; + if (!p.hasAllocateResponse()) { + return null; + } + + this.allocateResponse = + new AllocateResponsePBImpl(p.getAllocateResponse()); + return this.allocateResponse; + } + + @Override + public void setNodesForScheduling(List<NodeId> nodesForScheduling) { + maybeInitBuilder(); + if (nodesForScheduling == null || nodesForScheduling.isEmpty()) { + if (this.nodesForScheduling != null) { + this.nodesForScheduling.clear(); + } + builder.clearNodesForScheduling(); + return; + } + this.nodesForScheduling = new ArrayList<>(); + this.nodesForScheduling.addAll(nodesForScheduling); + } + + @Override + public List<NodeId> getNodesForScheduling() { + if (nodesForScheduling != null) { + return nodesForScheduling; + } + initLocalNodesForSchedulingList(); + return nodesForScheduling; + } + + private synchronized void initLocalNodesForSchedulingList() { + YarnServerCommonServiceProtos.DistSchedAllocateResponseProtoOrBuilder p = + viaProto ? proto : builder; + List<YarnProtos.NodeIdProto> list = p.getNodesForSchedulingList(); + nodesForScheduling = new ArrayList<>(); + if (list != null) { + for (YarnProtos.NodeIdProto t : list) { + nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t)); + } + } + } + private synchronized Iterable<YarnProtos.NodeIdProto> getNodeIdProtoIterable( + final List<NodeId> nodeList) { + maybeInitBuilder(); + return new Iterable<YarnProtos.NodeIdProto>() { + @Override + public synchronized Iterator<YarnProtos.NodeIdProto> iterator() { + return new Iterator<YarnProtos.NodeIdProto>() { + + Iterator<NodeId> iter = nodeList.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public YarnProtos.NodeIdProto next() { + return ProtoUtils.convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedRegisterResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedRegisterResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedRegisterResponsePBImpl.java new file mode 100644 index 0000000..0322c70 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedRegisterResponsePBImpl.java @@ -0,0 +1,304 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import com.google.protobuf.TextFormat; + +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; + + +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb + .RegisterApplicationMasterResponsePBImpl; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.proto.YarnProtos; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos; +import org.apache.hadoop.yarn.server.api.protocolrecords + .DistSchedRegisterResponse; + + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class DistSchedRegisterResponsePBImpl extends DistSchedRegisterResponse { + + YarnServerCommonServiceProtos.DistSchedRegisterResponseProto proto = + YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.getDefaultInstance(); + YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.Builder builder = null; + boolean viaProto = false; + + private Resource maxAllocatableCapability; + private Resource minAllocatableCapability; + private Resource incrAllocatableCapability; + private List<NodeId> nodesForScheduling; + private RegisterApplicationMasterResponse registerApplicationMasterResponse; + + public DistSchedRegisterResponsePBImpl() { + builder = YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.newBuilder(); + } + + public DistSchedRegisterResponsePBImpl(YarnServerCommonServiceProtos.DistSchedRegisterResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public YarnServerCommonServiceProtos.DistSchedRegisterResponseProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.newBuilder(proto); + } + viaProto = false; + } + + private synchronized void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private synchronized void mergeLocalToBuilder() { + if (this.nodesForScheduling != null) { + builder.clearNodesForScheduling(); + Iterable<YarnProtos.NodeIdProto> iterable = + getNodeIdProtoIterable(this.nodesForScheduling); + builder.addAllNodesForScheduling(iterable); + } + if (this.maxAllocatableCapability != null) { + builder.setMaxAllocCapability( + ProtoUtils.convertToProtoFormat(this.maxAllocatableCapability)); + } + if (this.minAllocatableCapability != null) { + builder.setMaxAllocCapability( + ProtoUtils.convertToProtoFormat(this.minAllocatableCapability)); + } + if (this.registerApplicationMasterResponse != null) { + builder.setRegisterResponse( + ((RegisterApplicationMasterResponsePBImpl) + this.registerApplicationMasterResponse).getProto()); + } + } + + @Override + public void setRegisterResponse(RegisterApplicationMasterResponse resp) { + maybeInitBuilder(); + if(registerApplicationMasterResponse == null) { + builder.clearRegisterResponse(); + } + this.registerApplicationMasterResponse = resp; + } + + @Override + public RegisterApplicationMasterResponse getRegisterResponse() { + if (this.registerApplicationMasterResponse != null) { + return this.registerApplicationMasterResponse; + } + + YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasRegisterResponse()) { + return null; + } + + this.registerApplicationMasterResponse = + new RegisterApplicationMasterResponsePBImpl(p.getRegisterResponse()); + return this.registerApplicationMasterResponse; + } + + @Override + public void setMaxAllocatableCapabilty(Resource maxResource) { + maybeInitBuilder(); + if(maxAllocatableCapability == null) { + builder.clearMaxAllocCapability(); + } + this.maxAllocatableCapability = maxResource; + } + + @Override + public Resource getMaxAllocatableCapabilty() { + if (this.maxAllocatableCapability != null) { + return this.maxAllocatableCapability; + } + + YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasMaxAllocCapability()) { + return null; + } + + this.maxAllocatableCapability = + ProtoUtils.convertFromProtoFormat(p.getMaxAllocCapability()); + return this.maxAllocatableCapability; + } + + @Override + public void setMinAllocatableCapabilty(Resource minResource) { + maybeInitBuilder(); + if(minAllocatableCapability == null) { + builder.clearMinAllocCapability(); + } + this.minAllocatableCapability = minResource; + } + + @Override + public Resource getMinAllocatableCapabilty() { + if (this.minAllocatableCapability != null) { + return this.minAllocatableCapability; + } + + YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasMinAllocCapability()) { + return null; + } + + this.minAllocatableCapability = + ProtoUtils.convertFromProtoFormat(p.getMinAllocCapability()); + return this.minAllocatableCapability; + } + + @Override + public void setIncrAllocatableCapabilty(Resource incrResource) { + maybeInitBuilder(); + if(incrAllocatableCapability == null) { + builder.clearIncrAllocCapability(); + } + this.incrAllocatableCapability = incrResource; + } + + @Override + public Resource getIncrAllocatableCapabilty() { + if (this.incrAllocatableCapability != null) { + return this.incrAllocatableCapability; + } + + YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasIncrAllocCapability()) { + return null; + } + + this.incrAllocatableCapability = + ProtoUtils.convertFromProtoFormat(p.getIncrAllocCapability()); + return this.incrAllocatableCapability; + } + + @Override + public void setContainerTokenExpiryInterval(int interval) { + maybeInitBuilder(); + builder.setContainerTokenExpiryInterval(interval); + } + + @Override + public int getContainerTokenExpiryInterval() { + YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasContainerTokenExpiryInterval()) { + return 0; + } + return p.getContainerTokenExpiryInterval(); + } + + @Override + public void setContainerIdStart(long containerIdStart) { + maybeInitBuilder(); + builder.setContainerIdStart(containerIdStart); + } + + @Override + public long getContainerIdStart() { + YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasContainerIdStart()) { + return 0; + } + return p.getContainerIdStart(); + } + + + @Override + public void setNodesForScheduling(List<NodeId> nodesForScheduling) { + maybeInitBuilder(); + if (nodesForScheduling == null || nodesForScheduling.isEmpty()) { + if (this.nodesForScheduling != null) { + this.nodesForScheduling.clear(); + } + builder.clearNodesForScheduling(); + return; + } + this.nodesForScheduling = new ArrayList<>(); + this.nodesForScheduling.addAll(nodesForScheduling); + } + + @Override + public List<NodeId> getNodesForScheduling() { + if (nodesForScheduling != null) { + return nodesForScheduling; + } + initLocalNodesForSchedulingList(); + return nodesForScheduling; + } + + private synchronized void initLocalNodesForSchedulingList() { + YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder; + List<YarnProtos.NodeIdProto> list = p.getNodesForSchedulingList(); + nodesForScheduling = new ArrayList<>(); + if (list != null) { + for (YarnProtos.NodeIdProto t : list) { + nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t)); + } + } + } + private synchronized Iterable<YarnProtos.NodeIdProto> getNodeIdProtoIterable( + final List<NodeId> nodeList) { + maybeInitBuilder(); + return new Iterable<YarnProtos.NodeIdProto>() { + @Override + public synchronized Iterator<YarnProtos.NodeIdProto> iterator() { + return new Iterator<YarnProtos.NodeIdProto>() { + + Iterator<NodeId> iter = nodeList.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public YarnProtos.NodeIdProto next() { + return ProtoUtils.convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto new file mode 100644 index 0000000..7e3a77f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * These .proto interfaces are public and stable. + * Please see http://wiki.apache.org/hadoop/Compatibility + * for what changes are allowed for a *stable* .proto interface. + */ + +option java_package = "org.apache.hadoop.yarn.proto"; +option java_outer_classname = "DistributedSchedulerProtocol"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.yarn; + +import "yarn_service_protos.proto"; +import "yarn_server_common_service_protos.proto"; + + +service DistributedSchedulerProtocolService { + rpc registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequestProto) returns (DistSchedRegisterResponseProto); + rpc allocateForDistributedScheduling (AllocateRequestProto) returns (DistSchedAllocateResponseProto); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index a54bbdb..786d8ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -26,6 +26,21 @@ import "yarn_protos.proto"; import "yarn_server_common_protos.proto"; import "yarn_service_protos.proto"; +message DistSchedRegisterResponseProto { + optional RegisterApplicationMasterResponseProto register_response = 1; + optional ResourceProto max_alloc_capability = 2; + optional ResourceProto min_alloc_capability = 3; + optional ResourceProto incr_alloc_capability = 4; + optional int32 container_token_expiry_interval = 5; + optional int64 container_id_start = 6; + repeated NodeIdProto nodes_for_scheduling = 7; +} + +message DistSchedAllocateResponseProto { + optional AllocateResponseProto allocate_response = 1; + repeated NodeIdProto nodes_for_scheduling = 2; +} + message NodeLabelsProto { repeated NodeLabelProto nodeLabels = 1; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 9c2d1fb..e0a4da4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; @@ -87,4 +88,8 @@ public interface Context { ConcurrentLinkedQueue<LogAggregationReport> getLogAggregationStatusForApps(); + + boolean isDistributedSchedulingEnabled(); + + OpportunisticContainerAllocator getContainerAllocator(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index a9a5411..ef7b760 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ScriptBasedNodeLabel import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer; @@ -187,9 +188,9 @@ public class NodeManager extends CompositeService protected NMContext createNMContext( NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, - NMStateStoreService stateStore) { + NMStateStoreService stateStore, boolean isDistSchedulerEnabled) { return new NMContext(containerTokenSecretManager, nmTokenSecretManager, - dirsHandler, aclsManager, stateStore); + dirsHandler, aclsManager, stateStore, isDistSchedulerEnabled); } protected void doSecureLogin() throws IOException { @@ -310,8 +311,12 @@ public class NodeManager extends CompositeService getNodeHealthScriptRunner(conf), dirsHandler); addService(nodeHealthChecker); + boolean isDistSchedulingEnabled = + conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, + YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT); + this.context = createNMContext(containerTokenSecretManager, - nmTokenSecretManager, nmStore); + nmTokenSecretManager, nmStore, isDistSchedulingEnabled); nodeLabelsProvider = createNodeLabelsProvider(conf); @@ -340,6 +345,10 @@ public class NodeManager extends CompositeService addService(webServer); ((NMContext) context).setWebServer(webServer); + ((NMContext) context).setQueueableContainerAllocator( + new OpportunisticContainerAllocator(nodeStatusUpdater, context, + webServer.getPort())); + dispatcher.register(ContainerManagerEventType.class, containerManager); dispatcher.register(NodeManagerEventType.class, this); addService(dispatcher); @@ -458,11 +467,14 @@ public class NodeManager extends CompositeService private boolean isDecommissioned = false; private final ConcurrentLinkedQueue<LogAggregationReport> logAggregationReportForApps; + private final boolean isDistSchedulingEnabled; + + private OpportunisticContainerAllocator containerAllocator; public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager, - NMStateStoreService stateStore) { + NMStateStoreService stateStore, boolean isDistSchedulingEnabled) { this.containerTokenSecretManager = containerTokenSecretManager; this.nmTokenSecretManager = nmTokenSecretManager; this.dirsHandler = dirsHandler; @@ -473,6 +485,7 @@ public class NodeManager extends CompositeService this.stateStore = stateStore; this.logAggregationReportForApps = new ConcurrentLinkedQueue< LogAggregationReport>(); + this.isDistSchedulingEnabled = isDistSchedulingEnabled; } /** @@ -585,6 +598,21 @@ public class NodeManager extends CompositeService getLogAggregationStatusForApps() { return this.logAggregationReportForApps; } + + @Override + public boolean isDistributedSchedulingEnabled() { + return isDistSchedulingEnabled; + } + + public void setQueueableContainerAllocator( + OpportunisticContainerAllocator containerAllocator) { + this.containerAllocator = containerAllocator; + } + + @Override + public OpportunisticContainerAllocator getContainerAllocator() { + return containerAllocator; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java index bd6538c..67bb52b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java @@ -63,6 +63,8 @@ import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; + +import org.apache.hadoop.yarn.server.nodemanager.scheduler.LocalScheduler; import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; @@ -464,6 +466,12 @@ public class AMRMProxyService extends AbstractService implements interceptorClassNames.add(item.trim()); } + // Make sure LocalScheduler is present at the beginning + // of the chain.. + if (this.nmContext.isDistributedSchedulingEnabled()) { + interceptorClassNames.add(0, LocalScheduler.class.getName()); + } + return interceptorClassNames; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java index 810dfa8..2b2a2f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java @@ -21,6 +21,14 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; import org.apache.hadoop.conf.Configuration; import com.google.common.base.Preconditions; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords + .RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; + +import java.io.IOException; /** * Implements the RequestInterceptor interface and provides common functionality @@ -99,4 +107,38 @@ public abstract class AbstractRequestInterceptor implements public AMRMProxyApplicationContext getApplicationContext() { return this.appContext; } + + /** + * Default implementation that invokes the distributed scheduling version + * of the register method. + * + * @param request ApplicationMaster allocate request + * @return Distribtued Scheduler Allocate Response + * @throws YarnException + * @throws IOException + */ + @Override + public DistSchedAllocateResponse allocateForDistributedScheduling + (AllocateRequest request) throws YarnException, IOException { + return (this.nextInterceptor != null) ? + this.nextInterceptor.allocateForDistributedScheduling(request) : null; + } + + /** + * Default implementation that invokes the distributed scheduling version + * of the allocate method. + * + * @param request ApplicationMaster registration request + * @return Distributed Scheduler Register Response + * @throws YarnException + * @throws IOException + */ + @Override + public DistSchedRegisterResponse + registerApplicationMasterForDistributedScheduling + (RegisterApplicationMasterRequest request) throws YarnException, + IOException { + return (this.nextInterceptor != null) ? this.nextInterceptor + .registerApplicationMasterForDistributedScheduling(request) : null; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java index 2c7939b..5e10d03 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java @@ -20,10 +20,15 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import com.google.common.base.Joiner; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -33,9 +38,16 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol; +import org.apache.hadoop.yarn.server.api.ServerRMProxy; +import org.apache.hadoop.yarn.server.api.protocolrecords + .DistSchedAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +60,7 @@ public final class DefaultRequestInterceptor extends AbstractRequestInterceptor { private static final Logger LOG = LoggerFactory .getLogger(DefaultRequestInterceptor.class); - private ApplicationMasterProtocol rmClient; + private DistributedSchedulerProtocol rmClient; private UserGroupInformation user = null; @Override @@ -63,11 +75,12 @@ public final class DefaultRequestInterceptor extends final Configuration conf = this.getConf(); rmClient = - user.doAs(new PrivilegedExceptionAction<ApplicationMasterProtocol>() { + user.doAs(new PrivilegedExceptionAction<DistributedSchedulerProtocol>() { @Override - public ApplicationMasterProtocol run() throws Exception { - return ClientRMProxy.createRMProxy(conf, - ApplicationMasterProtocol.class); + public DistributedSchedulerProtocol run() throws Exception { + setAMRMTokenService(conf); + return ServerRMProxy.createRMProxy(conf, + DistributedSchedulerProtocol.class); } }); } catch (IOException e) { @@ -108,6 +121,32 @@ public final class DefaultRequestInterceptor extends } @Override + public DistSchedRegisterResponse + registerApplicationMasterForDistributedScheduling + (RegisterApplicationMasterRequest request) throws YarnException, + IOException { + LOG.info("Forwarding registerApplicationMasterForDistributedScheduling" + + "request to the real YARN RM"); + return rmClient.registerApplicationMasterForDistributedScheduling(request); + } + + @Override + public DistSchedAllocateResponse allocateForDistributedScheduling + (AllocateRequest request) throws YarnException, IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Forwarding allocateForDistributedScheduling request" + + "to the real YARN RM"); + } + DistSchedAllocateResponse allocateResponse = + rmClient.allocateForDistributedScheduling(request); + if (allocateResponse.getAllocateResponse().getAMRMToken() != null) { + updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken()); + } + + return allocateResponse; + } + + @Override public FinishApplicationMasterResponse finishApplicationMaster( final FinishApplicationMasterRequest request) throws YarnException, IOException { @@ -135,4 +174,43 @@ public final class DefaultRequestInterceptor extends user.addToken(amrmToken); amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConf())); } + + private static void setAMRMTokenService(final Configuration conf) + throws IOException { + for (org.apache.hadoop.security.token.Token<? extends TokenIdentifier> token : UserGroupInformation + .getCurrentUser().getTokens()) { + if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { + token.setService(getAMRMTokenService(conf)); + } + } + } + + @InterfaceStability.Unstable + public static Text getAMRMTokenService(Configuration conf) { + return getTokenService(conf, YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); + } + + @InterfaceStability.Unstable + public static Text getTokenService(Configuration conf, String address, + String defaultAddr, int defaultPort) { + if (HAUtil.isHAEnabled(conf)) { + // Build a list of service addresses to form the service name + ArrayList<String> services = new ArrayList<String>(); + YarnConfiguration yarnConf = new YarnConfiguration(conf); + for (String rmId : HAUtil.getRMHAIds(conf)) { + // Set RM_ID to get the corresponding RM_ADDRESS + yarnConf.set(YarnConfiguration.RM_HA_ID, rmId); + services.add(SecurityUtil.buildTokenService( + yarnConf.getSocketAddr(address, defaultAddr, defaultPort)) + .toString()); + } + return new Text(Joiner.on(',').join(services)); + } + + // Non-HA case - no need to set RM_ID + return SecurityUtil.buildTokenService(conf.getSocketAddr(address, + defaultAddr, defaultPort)); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00875e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java index c74c88f..7a73563 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java @@ -19,14 +19,14 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol; /** * Defines the contract to be implemented by the request intercepter classes, * that can be used to intercept and inspect messages sent from the application * master to the resource manager. */ -public interface RequestInterceptor extends ApplicationMasterProtocol, +public interface RequestInterceptor extends DistributedSchedulerProtocol, Configurable { /** * This method is called for initializing the intercepter. This is guaranteed