YARN-3104. Fixed RM to not generate new AMRM tokens on every heartbeat between 
rolling and activation. Contributed by Jason Lowe


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4b1b2298
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4b1b2298
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4b1b2298

Branch: refs/heads/HDFS-EC
Commit: 4b1b2298b1229940eee89fc26cc9a2e2295de013
Parents: b12bede
Author: Jian He <jia...@apache.org>
Authored: Thu Feb 12 16:02:24 2015 -0800
Committer: Zhe Zhang <z...@apache.org>
Committed: Mon Feb 16 10:29:47 2015 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 ++
 .../ApplicationMasterService.java               | 16 ++++---
 .../rmapp/attempt/RMAppAttemptImpl.java         | 31 ++++++++++++--
 .../yarn/server/resourcemanager/MockAM.java     | 28 ++++--------
 .../security/TestAMRMTokens.java                | 45 ++++++++++++++++++--
 5 files changed, 90 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b1b2298/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index b91281e..bb19394 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -573,6 +573,9 @@ Release 2.7.0 - UNRELEASED
     YARN-1580. Documentation error regarding 
"container-allocation.expiry-interval-ms" 
     (Brahma Reddy Battula via junping_du)
 
+    YARN-3104. Fixed RM to not generate new AMRM tokens on every heartbeat
+    between rolling and activation. (Jason Lowe via jianhe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b1b2298/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 6650cf2..1c7f987 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -588,16 +588,20 @@ public class ApplicationMasterService extends 
AbstractService implements
       if (nextMasterKey != null
           && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier
             .getKeyId()) {
-        Token<AMRMTokenIdentifier> amrmToken =
-            rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
-              appAttemptId);
-        ((RMAppAttemptImpl)appAttempt).setAMRMToken(amrmToken);
+        RMAppAttemptImpl appAttemptImpl = (RMAppAttemptImpl)appAttempt;
+        Token<AMRMTokenIdentifier> amrmToken = appAttempt.getAMRMToken();
+        if (nextMasterKey.getMasterKey().getKeyId() !=
+            appAttemptImpl.getAMRMTokenKeyId()) {
+          LOG.info("The AMRMToken has been rolled-over. Send new AMRMToken 
back"
+              + " to application: " + applicationId);
+          amrmToken = rmContext.getAMRMTokenSecretManager()
+              .createAndGetAMRMToken(appAttemptId);
+          appAttemptImpl.setAMRMToken(amrmToken);
+        }
         allocateResponse.setAMRMToken(org.apache.hadoop.yarn.api.records.Token
           .newInstance(amrmToken.getIdentifier(), amrmToken.getKind()
             .toString(), amrmToken.getPassword(), amrmToken.getService()
             .toString()));
-        LOG.info("The AMRMToken has been rolled-over. Send new AMRMToken back"
-            + " to application: " + applicationId);
       }
 
       /*

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b1b2298/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 8d1270a..1a19eee 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -20,6 +20,7 @@ package 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
 
 import static org.apache.hadoop.yarn.util.StringHelper.pjoin;
 
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
@@ -59,6 +60,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
@@ -132,6 +134,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
   private final ApplicationAttemptId applicationAttemptId;
   private final ApplicationSubmissionContext submissionContext;
   private Token<AMRMTokenIdentifier> amrmToken = null;
+  private volatile Integer amrmTokenKeyId = null;
   private SecretKey clientTokenMasterKey = null;
 
   private ConcurrentMap<NodeId, List<ContainerStatus>>
@@ -593,11 +596,34 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
     this.writeLock.lock();
     try {
       this.amrmToken = lastToken;
+      this.amrmTokenKeyId = null;
     } finally {
       this.writeLock.unlock();
     }
   }
 
+  @Private
+  public int getAMRMTokenKeyId() {
+    Integer keyId = this.amrmTokenKeyId;
+    if (keyId == null) {
+      this.readLock.lock();
+      try {
+        if (this.amrmToken == null) {
+          throw new YarnRuntimeException("Missing AMRM token for "
+              + this.applicationAttemptId);
+        }
+        keyId = this.amrmToken.decodeIdentifier().getKeyId();
+        this.amrmTokenKeyId = keyId;
+      } catch (IOException e) {
+        throw new YarnRuntimeException("AMRM token decode error for "
+            + this.applicationAttemptId, e);
+      } finally {
+        this.readLock.unlock();
+      }
+    }
+    return keyId;
+  }
+
   @Override
   public Token<ClientToAMTokenIdentifier> createClientToken(String client) {
     this.readLock.lock();
@@ -846,9 +872,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
       }
     }
 
-    this.amrmToken =
-        rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
-          applicationAttemptId);
+    setAMRMToken(rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+        applicationAttemptId));
   }
 
   private static class BaseTransition implements

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b1b2298/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
index a3968f8..e1b8a3d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
@@ -210,37 +210,25 @@ public class MockAM {
       List<ResourceRequest> resourceRequest, List<ContainerId> releases)
       throws Exception {
     final AllocateRequest req =
-        AllocateRequest.newInstance(++responseId, 0F, resourceRequest,
+        AllocateRequest.newInstance(0, 0F, resourceRequest,
           releases, null);
-    UserGroupInformation ugi =
-        UserGroupInformation.createRemoteUser(attemptId.toString());
-    Token<AMRMTokenIdentifier> token =
-        context.getRMApps().get(attemptId.getApplicationId())
-          .getRMAppAttempt(attemptId).getAMRMToken();
-    ugi.addTokenIdentifier(token.decodeIdentifier());
-    try {
-      return ugi.doAs(new PrivilegedExceptionAction<AllocateResponse>() {
-        @Override
-        public AllocateResponse run() throws Exception {
-          return amRMProtocol.allocate(req);
-        }
-      });
-    } catch (UndeclaredThrowableException e) {
-      throw (Exception) e.getCause();
-    }
+    return allocate(req);
   }
 
   public AllocateResponse allocate(AllocateRequest allocateRequest)
             throws Exception {
-    final AllocateRequest req = allocateRequest;
-    req.setResponseId(++responseId);
-
     UserGroupInformation ugi =
         UserGroupInformation.createRemoteUser(attemptId.toString());
     Token<AMRMTokenIdentifier> token =
         context.getRMApps().get(attemptId.getApplicationId())
             .getRMAppAttempt(attemptId).getAMRMToken();
     ugi.addTokenIdentifier(token.decodeIdentifier());
+    return doAllocateAs(ugi, allocateRequest);
+  }
+
+  public AllocateResponse doAllocateAs(UserGroupInformation ugi,
+      final AllocateRequest req) throws Exception {
+    req.setResponseId(++responseId);
     try {
       return ugi.doAs(new PrivilegedExceptionAction<AllocateResponse>() {
         @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b1b2298/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java
index be833a1..0be72e3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java
@@ -18,11 +18,18 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.security;
 
+import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -49,6 +56,8 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService;
 import 
org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
 import 
org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -207,7 +216,6 @@ public class TestAMRMTokens {
    * 
    * @throws Exception
    */
-  @SuppressWarnings("deprecation")
   @Test
   public void testMasterKeyRollOver() throws Exception {
 
@@ -336,21 +344,40 @@ public class TestAMRMTokens {
 
   @Test (timeout = 20000)
   public void testAMRMMasterKeysUpdate() throws Exception {
+    final AtomicReference<AMRMTokenSecretManager> spySecretMgrRef =
+        new AtomicReference<AMRMTokenSecretManager>();
     MockRM rm = new MockRM(conf) {
       @Override
       protected void doSecureLogin() throws IOException {
         // Skip the login.
       }
+
+      @Override
+      protected RMSecretManagerService createRMSecretManagerService() {
+        return new RMSecretManagerService(conf, rmContext) {
+          @Override
+          protected AMRMTokenSecretManager createAMRMTokenSecretManager(
+              Configuration conf, RMContext rmContext) {
+            AMRMTokenSecretManager spySecretMgr = spy(
+                super.createAMRMTokenSecretManager(conf, rmContext));
+            spySecretMgrRef.set(spySecretMgr);
+            return spySecretMgr;
+          }
+        };
+      }
     };
     rm.start();
     MockNM nm = rm.registerNode("127.0.0.1:1234", 8000);
     RMApp app = rm.submitApp(200);
     MockAM am = MockRM.launchAndRegisterAM(app, rm, nm);
-
+    AMRMTokenSecretManager spySecretMgr = spySecretMgrRef.get();
     // Do allocate. Should not update AMRMToken
     AllocateResponse response =
         am.allocate(Records.newRecord(AllocateRequest.class));
     Assert.assertNull(response.getAMRMToken());
+    Token<AMRMTokenIdentifier> oldToken = rm.getRMContext().getRMApps()
+        .get(app.getApplicationId())
+        .getRMAppAttempt(am.getApplicationAttemptId()).getAMRMToken();
 
     // roll over the master key
     // Do allocate again. the AM should get the latest AMRMToken
@@ -366,8 +393,18 @@ public class TestAMRMTokens {
       .getRMContext().getAMRMTokenSecretManager().getMasterKey().getMasterKey()
       .getKeyId());
 
-    // Do allocate again. The master key does not update.
-    // AM should not update its AMRMToken either
+    // Do allocate again with the same old token and verify the RM sends
+    // back the last generated token instead of generating it again.
+    reset(spySecretMgr);
+    UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
+        am.getApplicationAttemptId().toString(), new String[0]);
+    ugi.addTokenIdentifier(oldToken.decodeIdentifier());
+    response = am.doAllocateAs(ugi, Records.newRecord(AllocateRequest.class));
+    Assert.assertNotNull(response.getAMRMToken());
+    verify(spySecretMgr, 
never()).createAndGetAMRMToken(isA(ApplicationAttemptId.class));
+
+    // Do allocate again with the updated token and verify we do not
+    // receive a new token to use.
     response = am.allocate(Records.newRecord(AllocateRequest.class));
     Assert.assertNull(response.getAMRMToken());
 

Reply via email to