YARN-3104. Fixed RM to not generate new AMRM tokens on every heartbeat between rolling and activation. Contributed by Jason Lowe
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4b1b2298 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4b1b2298 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4b1b2298 Branch: refs/heads/HDFS-EC Commit: 4b1b2298b1229940eee89fc26cc9a2e2295de013 Parents: b12bede Author: Jian He <jia...@apache.org> Authored: Thu Feb 12 16:02:24 2015 -0800 Committer: Zhe Zhang <z...@apache.org> Committed: Mon Feb 16 10:29:47 2015 -0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 ++ .../ApplicationMasterService.java | 16 ++++--- .../rmapp/attempt/RMAppAttemptImpl.java | 31 ++++++++++++-- .../yarn/server/resourcemanager/MockAM.java | 28 ++++-------- .../security/TestAMRMTokens.java | 45 ++++++++++++++++++-- 5 files changed, 90 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b1b2298/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index b91281e..bb19394 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -573,6 +573,9 @@ Release 2.7.0 - UNRELEASED YARN-1580. Documentation error regarding "container-allocation.expiry-interval-ms" (Brahma Reddy Battula via junping_du) + YARN-3104. Fixed RM to not generate new AMRM tokens on every heartbeat + between rolling and activation. (Jason Lowe via jianhe) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b1b2298/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 6650cf2..1c7f987 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -588,16 +588,20 @@ public class ApplicationMasterService extends AbstractService implements if (nextMasterKey != null && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier .getKeyId()) { - Token<AMRMTokenIdentifier> amrmToken = - rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken( - appAttemptId); - ((RMAppAttemptImpl)appAttempt).setAMRMToken(amrmToken); + RMAppAttemptImpl appAttemptImpl = (RMAppAttemptImpl)appAttempt; + Token<AMRMTokenIdentifier> amrmToken = appAttempt.getAMRMToken(); + if (nextMasterKey.getMasterKey().getKeyId() != + appAttemptImpl.getAMRMTokenKeyId()) { + LOG.info("The AMRMToken has been rolled-over. Send new AMRMToken back" + + " to application: " + applicationId); + amrmToken = rmContext.getAMRMTokenSecretManager() + .createAndGetAMRMToken(appAttemptId); + appAttemptImpl.setAMRMToken(amrmToken); + } allocateResponse.setAMRMToken(org.apache.hadoop.yarn.api.records.Token .newInstance(amrmToken.getIdentifier(), amrmToken.getKind() .toString(), amrmToken.getPassword(), amrmToken.getService() .toString())); - LOG.info("The AMRMToken has been rolled-over. Send new AMRMToken back" - + " to application: " + applicationId); } /* http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b1b2298/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 8d1270a..1a19eee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt; import static org.apache.hadoop.yarn.util.StringHelper.pjoin; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -59,6 +60,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; @@ -132,6 +134,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { private final ApplicationAttemptId applicationAttemptId; private final ApplicationSubmissionContext submissionContext; private Token<AMRMTokenIdentifier> amrmToken = null; + private volatile Integer amrmTokenKeyId = null; private SecretKey clientTokenMasterKey = null; private ConcurrentMap<NodeId, List<ContainerStatus>> @@ -593,11 +596,34 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { this.writeLock.lock(); try { this.amrmToken = lastToken; + this.amrmTokenKeyId = null; } finally { this.writeLock.unlock(); } } + @Private + public int getAMRMTokenKeyId() { + Integer keyId = this.amrmTokenKeyId; + if (keyId == null) { + this.readLock.lock(); + try { + if (this.amrmToken == null) { + throw new YarnRuntimeException("Missing AMRM token for " + + this.applicationAttemptId); + } + keyId = this.amrmToken.decodeIdentifier().getKeyId(); + this.amrmTokenKeyId = keyId; + } catch (IOException e) { + throw new YarnRuntimeException("AMRM token decode error for " + + this.applicationAttemptId, e); + } finally { + this.readLock.unlock(); + } + } + return keyId; + } + @Override public Token<ClientToAMTokenIdentifier> createClientToken(String client) { this.readLock.lock(); @@ -846,9 +872,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { } } - this.amrmToken = - rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken( - applicationAttemptId); + setAMRMToken(rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken( + applicationAttemptId)); } private static class BaseTransition implements http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b1b2298/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index a3968f8..e1b8a3d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -210,37 +210,25 @@ public class MockAM { List<ResourceRequest> resourceRequest, List<ContainerId> releases) throws Exception { final AllocateRequest req = - AllocateRequest.newInstance(++responseId, 0F, resourceRequest, + AllocateRequest.newInstance(0, 0F, resourceRequest, releases, null); - UserGroupInformation ugi = - UserGroupInformation.createRemoteUser(attemptId.toString()); - Token<AMRMTokenIdentifier> token = - context.getRMApps().get(attemptId.getApplicationId()) - .getRMAppAttempt(attemptId).getAMRMToken(); - ugi.addTokenIdentifier(token.decodeIdentifier()); - try { - return ugi.doAs(new PrivilegedExceptionAction<AllocateResponse>() { - @Override - public AllocateResponse run() throws Exception { - return amRMProtocol.allocate(req); - } - }); - } catch (UndeclaredThrowableException e) { - throw (Exception) e.getCause(); - } + return allocate(req); } public AllocateResponse allocate(AllocateRequest allocateRequest) throws Exception { - final AllocateRequest req = allocateRequest; - req.setResponseId(++responseId); - UserGroupInformation ugi = UserGroupInformation.createRemoteUser(attemptId.toString()); Token<AMRMTokenIdentifier> token = context.getRMApps().get(attemptId.getApplicationId()) .getRMAppAttempt(attemptId).getAMRMToken(); ugi.addTokenIdentifier(token.decodeIdentifier()); + return doAllocateAs(ugi, allocateRequest); + } + + public AllocateResponse doAllocateAs(UserGroupInformation ugi, + final AllocateRequest req) throws Exception { + req.setResponseId(++responseId); try { return ugi.doAs(new PrivilegedExceptionAction<AllocateResponse>() { @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b1b2298/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java index be833a1..0be72e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java @@ -18,11 +18,18 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; +import static org.mockito.Mockito.isA; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + import java.io.IOException; import java.net.InetSocketAddress; import java.security.PrivilegedAction; import java.util.Arrays; import java.util.Collection; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -49,6 +56,8 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -207,7 +216,6 @@ public class TestAMRMTokens { * * @throws Exception */ - @SuppressWarnings("deprecation") @Test public void testMasterKeyRollOver() throws Exception { @@ -336,21 +344,40 @@ public class TestAMRMTokens { @Test (timeout = 20000) public void testAMRMMasterKeysUpdate() throws Exception { + final AtomicReference<AMRMTokenSecretManager> spySecretMgrRef = + new AtomicReference<AMRMTokenSecretManager>(); MockRM rm = new MockRM(conf) { @Override protected void doSecureLogin() throws IOException { // Skip the login. } + + @Override + protected RMSecretManagerService createRMSecretManagerService() { + return new RMSecretManagerService(conf, rmContext) { + @Override + protected AMRMTokenSecretManager createAMRMTokenSecretManager( + Configuration conf, RMContext rmContext) { + AMRMTokenSecretManager spySecretMgr = spy( + super.createAMRMTokenSecretManager(conf, rmContext)); + spySecretMgrRef.set(spySecretMgr); + return spySecretMgr; + } + }; + } }; rm.start(); MockNM nm = rm.registerNode("127.0.0.1:1234", 8000); RMApp app = rm.submitApp(200); MockAM am = MockRM.launchAndRegisterAM(app, rm, nm); - + AMRMTokenSecretManager spySecretMgr = spySecretMgrRef.get(); // Do allocate. Should not update AMRMToken AllocateResponse response = am.allocate(Records.newRecord(AllocateRequest.class)); Assert.assertNull(response.getAMRMToken()); + Token<AMRMTokenIdentifier> oldToken = rm.getRMContext().getRMApps() + .get(app.getApplicationId()) + .getRMAppAttempt(am.getApplicationAttemptId()).getAMRMToken(); // roll over the master key // Do allocate again. the AM should get the latest AMRMToken @@ -366,8 +393,18 @@ public class TestAMRMTokens { .getRMContext().getAMRMTokenSecretManager().getMasterKey().getMasterKey() .getKeyId()); - // Do allocate again. The master key does not update. - // AM should not update its AMRMToken either + // Do allocate again with the same old token and verify the RM sends + // back the last generated token instead of generating it again. + reset(spySecretMgr); + UserGroupInformation ugi = UserGroupInformation.createUserForTesting( + am.getApplicationAttemptId().toString(), new String[0]); + ugi.addTokenIdentifier(oldToken.decodeIdentifier()); + response = am.doAllocateAs(ugi, Records.newRecord(AllocateRequest.class)); + Assert.assertNotNull(response.getAMRMToken()); + verify(spySecretMgr, never()).createAndGetAMRMToken(isA(ApplicationAttemptId.class)); + + // Do allocate again with the updated token and verify we do not + // receive a new token to use. response = am.allocate(Records.newRecord(AllocateRequest.class)); Assert.assertNull(response.getAMRMToken());