Repository: hadoop Updated Branches: refs/heads/trunk 49ff54c86 -> 55ae14392
YARN-4117. End to end unit test with mini YARN cluster for AMRMProxy Service. Contributed by Giovanni Matteo Fumarola Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/55ae1439 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/55ae1439 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/55ae1439 Branch: refs/heads/trunk Commit: 55ae1439233e8585d624b2872e1e4753ef63eebb Parents: 49ff54c Author: Jian He <jia...@apache.org> Authored: Sun Mar 27 20:22:12 2016 -0700 Committer: Jian He <jia...@apache.org> Committed: Sun Mar 27 20:22:12 2016 -0700 ---------------------------------------------------------------------- .../nodemanager/amrmproxy/AMRMProxyService.java | 14 +++- .../amrmproxy/DefaultRequestInterceptor.java | 7 ++ .../containermanager/ContainerManagerImpl.java | 49 ++++++++----- .../hadoop/yarn/server/MiniYARNCluster.java | 75 +++++++++++++++++++- 4 files changed, 122 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/55ae1439/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java index bd6538c..038c697 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.DataOutputBuffer; @@ -512,6 +513,16 @@ public class AMRMProxyService extends AbstractService implements return null; } + @Private + public InetSocketAddress getBindAddress() { + return this.listenerEndpoint; + } + + @Private + public AMRMProxyTokenSecretManager getSecretManager() { + return this.secretManager; + } + /** * Private class for handling application stop events. * @@ -546,7 +557,8 @@ public class AMRMProxyService extends AbstractService implements * ApplicationAttemptId instances. * */ - private static class RequestInterceptorChainWrapper { + @Private + public static class RequestInterceptorChainWrapper { private RequestInterceptor rootInterceptor; private ApplicationAttemptId applicationAttemptId; http://git-wip-us.apache.org/repos/asf/hadoop/blob/55ae1439/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java index 2c7939b..4457dd8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java @@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + /** * Extends the AbstractRequestInterceptor class and provides an implementation * that simply forwards the AM requests to the cluster resource manager. @@ -135,4 +137,9 @@ public final class DefaultRequestInterceptor extends user.addToken(amrmToken); amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConf())); } + + @VisibleForTesting + public void setRMClient(ApplicationMasterProtocol rmClient) { + this.rmClient = rmClient; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/55ae1439/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 94d5c1e..8d09aa7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -183,7 +183,7 @@ public class ContainerManagerImpl extends CompositeService implements private final ReadLock readLock; private final WriteLock writeLock; private AMRMProxyService amrmProxyService; - private boolean amrmProxyEnabled = false; + protected boolean amrmProxyEnabled = false; private long waitForContainersOnShutdownMillis; @@ -247,19 +247,7 @@ public class ContainerManagerImpl extends CompositeService implements addService(sharedCacheUploader); dispatcher.register(SharedCacheUploadEventType.class, sharedCacheUploader); - amrmProxyEnabled = - conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, - YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED); - - if (amrmProxyEnabled) { - LOG.info("AMRMProxyService is enabled. " - + "All the AM->RM requests will be intercepted by the proxy"); - this.amrmProxyService = - new AMRMProxyService(this.context, this.dispatcher); - addService(this.amrmProxyService); - } else { - LOG.info("AMRMProxyService is disabled"); - } + createAMRMProxyService(conf); waitForContainersOnShutdownMillis = conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, @@ -272,8 +260,20 @@ public class ContainerManagerImpl extends CompositeService implements recover(); } - public boolean isARMRMProxyEnabled() { - return amrmProxyEnabled; + protected void createAMRMProxyService(Configuration conf) { + this.amrmProxyEnabled = + conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, + YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED); + + if (amrmProxyEnabled) { + LOG.info("AMRMProxyService is enabled. " + + "All the AM->RM requests will be intercepted by the proxy"); + this.setAMRMProxyService( + new AMRMProxyService(this.context, this.dispatcher)); + addService(this.getAMRMProxyService()); + } else { + LOG.info("AMRMProxyService is disabled"); + } } @SuppressWarnings("unchecked") @@ -810,9 +810,9 @@ public class ContainerManagerImpl extends CompositeService implements // Initialize the AMRMProxy service instance only if the container is of // type AM and if the AMRMProxy service is enabled - if (isARMRMProxyEnabled() && containerTokenIdentifier - .getContainerType().equals(ContainerType.APPLICATION_MASTER)) { - this.amrmProxyService.processApplicationStartRequest(request); + if (amrmProxyEnabled && containerTokenIdentifier.getContainerType() + .equals(ContainerType.APPLICATION_MASTER)) { + this.getAMRMProxyService().processApplicationStartRequest(request); } startContainerInternal(nmTokenIdentifier, containerTokenIdentifier, @@ -1413,4 +1413,15 @@ public class ContainerManagerImpl extends CompositeService implements public Map<String, ByteBuffer> getAuxServiceMetaData() { return this.auxiliaryServices.getMetaData(); } + + @Private + public AMRMProxyService getAMRMProxyService() { + return this.amrmProxyService; + } + + @Private + protected void setAMRMProxyService(AMRMProxyService amrmProxyService) { + this.amrmProxyService = amrmProxyService; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/55ae1439/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 74b7732..c933736 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -35,21 +35,23 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -61,24 +63,31 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer; import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore; import org.apache.hadoop.yarn.server.applicationhistoryservice.MemoryApplicationHistoryStore; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore; import org.apache.hadoop.yarn.server.timeline.TimelineStore; import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore; import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; -import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import com.google.common.annotations.VisibleForTesting; @@ -698,6 +707,15 @@ public class MiniYARNCluster extends CompositeService { protected void stopRMProxy() { } }; } + + @Override + protected ContainerManagerImpl createContainerManager(Context context, + ContainerExecutor exec, DeletionService del, + NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager, + LocalDirsHandlerService dirsHandler) { + return new CustomContainerManagerImpl(context, exec, del, + nodeStatusUpdater, metrics, dirsHandler); + } } /** @@ -799,4 +817,55 @@ public class MiniYARNCluster extends CompositeService { public int getNumOfResourceManager() { return this.resourceManagers.length; } + + private class CustomContainerManagerImpl extends ContainerManagerImpl { + + public CustomContainerManagerImpl(Context context, ContainerExecutor exec, + DeletionService del, NodeStatusUpdater nodeStatusUpdater, + NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) { + super(context, exec, del, nodeStatusUpdater, metrics, dirsHandler); + } + + @Override + protected void createAMRMProxyService(Configuration conf) { + this.amrmProxyEnabled = + conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, + YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED); + + if (this.amrmProxyEnabled) { + LOG.info("CustomAMRMProxyService is enabled. " + + "All the AM->RM requests will be intercepted by the proxy"); + AMRMProxyService amrmProxyService = + useRpc ? new AMRMProxyService(getContext(), dispatcher) + : new ShortCircuitedAMRMProxy(getContext(), dispatcher); + this.setAMRMProxyService(amrmProxyService); + addService(this.getAMRMProxyService()); + } else { + LOG.info("CustomAMRMProxyService is disabled"); + } + } + } + + private class ShortCircuitedAMRMProxy extends AMRMProxyService { + + public ShortCircuitedAMRMProxy(Context context, + AsyncDispatcher dispatcher) { + super(context, dispatcher); + } + + @Override + protected void initializePipeline(ApplicationAttemptId applicationAttemptId, + String user, Token<AMRMTokenIdentifier> amrmToken, + Token<AMRMTokenIdentifier> localToken) { + super.initializePipeline(applicationAttemptId, user, amrmToken, + localToken); + RequestInterceptor rt = getPipelines() + .get(applicationAttemptId.getApplicationId()).getRootInterceptor(); + if (rt instanceof DefaultRequestInterceptor) { + ((DefaultRequestInterceptor) rt) + .setRMClient(getResourceManager().getApplicationMasterService()); + } + } + + } }