Repository: hadoop Updated Branches: refs/heads/trunk 90fcb16c0 -> 8831b18c6
Missing file for YARN-4117. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8831b18c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8831b18c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8831b18c Branch: refs/heads/trunk Commit: 8831b18c65c8a61e400346dc308e068381835357 Parents: 90fcb16 Author: Jian He <jia...@apache.org> Authored: Mon Mar 28 09:23:06 2016 -0700 Committer: Jian He <jia...@apache.org> Committed: Mon Mar 28 09:59:47 2016 -0700 ---------------------------------------------------------------------- .../yarn/client/api/impl/TestAMRMProxy.java | 413 +++++++++++++++++++ 1 file changed, 413 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8831b18c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java new file mode 100644 index 0000000..b92538a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java @@ -0,0 +1,413 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.client.api.impl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; +import org.junit.Test; + +public class TestAMRMProxy { + + private static final Log LOG = LogFactory.getLog(TestAMRMProxy.class); + + /* + * This test validates register, allocate and finish of an application through + * the AMRMPRoxy. + */ + @Test(timeout = 60000) + public void testAMRMProxyE2E() throws Exception { + MiniYARNCluster cluster = new MiniYARNCluster("testAMRMProxyE2E", 1, 1, 1); + YarnClient rmClient = null; + ApplicationMasterProtocol client; + + try { + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); + cluster.init(conf); + cluster.start(); + final Configuration yarnConf = cluster.getConfig(); + + // the client has to connect to AMRMProxy + + yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS); + rmClient = YarnClient.createYarnClient(); + rmClient.init(yarnConf); + rmClient.start(); + + // Submit application + + ApplicationId appId = createApp(rmClient, cluster); + + client = createAMRMProtocol(rmClient, appId, cluster, yarnConf); + + LOG.info("testAMRMProxyE2E - Register Application Master"); + + RegisterApplicationMasterResponse responseRegister = + client.registerApplicationMaster(RegisterApplicationMasterRequest + .newInstance(NetUtils.getHostname(), 1024, "")); + + Assert.assertNotNull(responseRegister); + Assert.assertNotNull(responseRegister.getQueue()); + Assert.assertNotNull(responseRegister.getApplicationACLs()); + Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey()); + Assert + .assertNotNull(responseRegister.getContainersFromPreviousAttempts()); + Assert.assertNotNull(responseRegister.getSchedulerResourceTypes()); + Assert.assertNotNull(responseRegister.getMaximumResourceCapability()); + + RMApp rmApp = + cluster.getResourceManager().getRMContext().getRMApps().get(appId); + Assert.assertEquals(RMAppState.RUNNING, rmApp.getState()); + + LOG.info("testAMRMProxyE2E - Allocate Resources Application Master"); + + AllocateRequest request = + createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING)); + + AllocateResponse allocResponse = client.allocate(request); + Assert.assertNotNull(allocResponse); + Assert.assertEquals(0, allocResponse.getAllocatedContainers().size()); + + request.setAskList(new ArrayList<ResourceRequest>()); + request.setResponseId(request.getResponseId() + 1); + + Thread.sleep(1000); + + // RM should allocate container within 2 calls to allocate() + allocResponse = client.allocate(request); + Assert.assertNotNull(allocResponse); + Assert.assertEquals(2, allocResponse.getAllocatedContainers().size()); + + LOG.info("testAMRMPRoxy - Finish Application Master"); + + FinishApplicationMasterResponse responseFinish = + client.finishApplicationMaster(FinishApplicationMasterRequest + .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null)); + + Assert.assertNotNull(responseFinish); + + Thread.sleep(500); + Assert.assertNotEquals(RMAppState.FINISHED, rmApp.getState()); + + } finally { + if (rmClient != null) { + rmClient.stop(); + } + cluster.stop(); + } + } + + /* + * This test validates the token renewal from the AMRMPRoxy. The test verifies + * that the received token it is different from the previous one within 5 + * requests. + */ + @Test(timeout = 60000) + public void testE2ETokenRenewal() throws Exception { + MiniYARNCluster cluster = + new MiniYARNCluster("testE2ETokenRenewal", 1, 1, 1); + YarnClient rmClient = null; + ApplicationMasterProtocol client; + + try { + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); + conf.setInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 1500); + conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 1500); + conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 1500); + // RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS should be at least + // RM_AM_EXPIRY_INTERVAL_MS * 1.5 *3 + conf.setInt( + YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, 6); + cluster.init(conf); + cluster.start(); + final Configuration yarnConf = cluster.getConfig(); + yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS); + rmClient = YarnClient.createYarnClient(); + rmClient.init(yarnConf); + rmClient.start(); + + // Submit + + ApplicationId appId = createApp(rmClient, cluster); + + client = createAMRMProtocol(rmClient, appId, cluster, yarnConf); + + client.registerApplicationMaster(RegisterApplicationMasterRequest + .newInstance(NetUtils.getHostname(), 1024, "")); + + LOG.info("testAMRMPRoxy - Allocate Resources Application Master"); + + AllocateRequest request = + createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING)); + + Token lastToken = null; + AllocateResponse response = null; + + for (int i = 0; i < 5; i++) { + + response = client.allocate(request); + request.setResponseId(request.getResponseId() + 1); + + if (response.getAMRMToken() != null + && !response.getAMRMToken().equals(lastToken)) { + break; + } + + lastToken = response.getAMRMToken(); + + // Time slot to be sure the RM renew the token + Thread.sleep(1500); + + } + + Assert.assertFalse(response.getAMRMToken().equals(lastToken)); + + LOG.info("testAMRMPRoxy - Finish Application Master"); + + client.finishApplicationMaster(FinishApplicationMasterRequest + .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null)); + + } finally { + if (rmClient != null) { + rmClient.stop(); + } + cluster.stop(); + } + } + + /* + * This test validates that an AM cannot register directly to the RM, with the + * token provided by the AMRMProxy. + */ + @Test(timeout = 60000) + public void testE2ETokenSwap() throws Exception { + MiniYARNCluster cluster = new MiniYARNCluster("testE2ETokenSwap", 1, 1, 1); + YarnClient rmClient = null; + ApplicationMasterProtocol client; + + try { + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); + cluster.init(conf); + cluster.start(); + + // the client will connect to the RM with the token provided by AMRMProxy + final Configuration yarnConf = cluster.getConfig(); + rmClient = YarnClient.createYarnClient(); + rmClient.init(yarnConf); + rmClient.start(); + + ApplicationId appId = createApp(rmClient, cluster); + + client = createAMRMProtocol(rmClient, appId, cluster, yarnConf); + + try { + client.registerApplicationMaster(RegisterApplicationMasterRequest + .newInstance(NetUtils.getHostname(), 1024, "")); + Assert.fail(); + } catch (IOException e) { + Assert.assertTrue( + e.getMessage().startsWith("Invalid AMRMToken from appattempt_")); + } + + } finally { + if (rmClient != null) { + rmClient.stop(); + } + cluster.stop(); + } + } + + private ApplicationMasterProtocol createAMRMProtocol(YarnClient rmClient, + ApplicationId appId, MiniYARNCluster cluster, + final Configuration yarnConf) + throws IOException, InterruptedException, YarnException { + + UserGroupInformation user = null; + + // Get the AMRMToken from AMRMProxy + + ApplicationReport report = rmClient.getApplicationReport(appId); + + user = UserGroupInformation.createProxyUser( + report.getCurrentApplicationAttemptId().toString(), + UserGroupInformation.getCurrentUser()); + + ContainerManagerImpl containerManager = (ContainerManagerImpl) cluster + .getNodeManager(0).getNMContext().getContainerManager(); + + AMRMProxyTokenSecretManager amrmTokenSecretManager = + containerManager.getAMRMProxyService().getSecretManager(); + org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token = + amrmTokenSecretManager + .createAndGetAMRMToken(report.getCurrentApplicationAttemptId()); + + SecurityUtil.setTokenService(token, + containerManager.getAMRMProxyService().getBindAddress()); + user.addToken(token); + + // Start Application Master + + return user + .doAs(new PrivilegedExceptionAction<ApplicationMasterProtocol>() { + @Override + public ApplicationMasterProtocol run() throws Exception { + return ClientRMProxy.createRMProxy(yarnConf, + ApplicationMasterProtocol.class); + } + }); + } + + private AllocateRequest createAllocateRequest(List<NodeReport> listNode) { + // The test needs AMRMClient to create a real allocate request + AMRMClientImpl<ContainerRequest> amClient = + new AMRMClientImpl<ContainerRequest>(); + + Resource capability = Resource.newInstance(1024, 2); + Priority priority = Priority.newInstance(1); + List<NodeReport> nodeReports = listNode; + String node = nodeReports.get(0).getNodeId().getHost(); + String[] nodes = new String[] { node }; + + ContainerRequest storedContainer1 = + new ContainerRequest(capability, nodes, null, priority); + amClient.addContainerRequest(storedContainer1); + amClient.addContainerRequest(storedContainer1); + + List<ResourceRequest> resourceAsk = new ArrayList<ResourceRequest>(); + for (ResourceRequest rr : amClient.ask) { + resourceAsk.add(rr); + } + + ResourceBlacklistRequest resourceBlacklistRequest = ResourceBlacklistRequest + .newInstance(new ArrayList<String>(), new ArrayList<String>()); + + int responseId = 1; + + return AllocateRequest.newInstance(responseId, 0, resourceAsk, + new ArrayList<ContainerId>(), resourceBlacklistRequest); + } + + private ApplicationId createApp(YarnClient yarnClient, + MiniYARNCluster yarnCluster) throws Exception { + + ApplicationSubmissionContext appContext = + yarnClient.createApplication().getApplicationSubmissionContext(); + ApplicationId appId = appContext.getApplicationId(); + + appContext.setApplicationName("Test"); + + Priority pri = Records.newRecord(Priority.class); + pri.setPriority(0); + appContext.setPriority(pri); + + appContext.setQueue("default"); + + ContainerLaunchContext amContainer = BuilderUtils.newContainerLaunchContext( + Collections.<String, LocalResource> emptyMap(), + new HashMap<String, String>(), Arrays.asList("sleep", "10000"), + new HashMap<String, ByteBuffer>(), null, + new HashMap<ApplicationAccessType, String>()); + appContext.setAMContainerSpec(amContainer); + appContext.setResource(Resource.newInstance(1024, 1)); + + SubmitApplicationRequest appRequest = + Records.newRecord(SubmitApplicationRequest.class); + appRequest.setApplicationSubmissionContext(appContext); + + yarnClient.submitApplication(appContext); + + RMAppAttempt appAttempt = null; + while (true) { + ApplicationReport appReport = yarnClient.getApplicationReport(appId); + if (appReport + .getYarnApplicationState() == YarnApplicationState.ACCEPTED) { + ApplicationAttemptId attemptId = + appReport.getCurrentApplicationAttemptId(); + appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps() + .get(attemptId.getApplicationId()).getCurrentAppAttempt(); + while (true) { + if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) { + break; + } + } + break; + } + } + Thread.sleep(1000); + return appId; + } +}