Repository: hadoop
Updated Branches:
  refs/heads/trunk 5df7ecb33 -> 0402bada1


YARN-2964. RM prematurely cancels tokens for jobs that submit jobs (oozie). 
Contributed by Jian He


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0402bada
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0402bada
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0402bada

Branch: refs/heads/trunk
Commit: 0402bada1989258ecbfdc437cb339322a1f55a97
Parents: 5df7ecb
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Dec 18 23:28:18 2014 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Dec 18 23:28:18 2014 +0000

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../security/DelegationTokenRenewer.java        | 42 +++++++++++--
 .../yarn/server/resourcemanager/MockRM.java     | 13 ++--
 .../security/TestDelegationTokenRenewer.java    | 62 ++++++++++++++++++--
 4 files changed, 103 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0402bada/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 8e8dd4f..d0d5c72 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -244,6 +244,9 @@ Release 2.7.0 - UNRELEASED
     YARN-2944. InMemorySCMStore can not be instantiated with 
ReflectionUtils#newInstance.
     (Chris Trezzo via kasha)
 
+    YARN-2964. RM prematurely cancels tokens for jobs that submit jobs (oozie)
+    (Jian He via jlowe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0402bada/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
index f9be1bd..dfcceb8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
@@ -69,7 +69,6 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 /**
  * Service to renew application delegation tokens.
  */
@@ -94,6 +93,9 @@ public class DelegationTokenRenewer extends AbstractService {
   private ConcurrentMap<ApplicationId, Set<DelegationTokenToRenew>> appTokens =
       new ConcurrentHashMap<ApplicationId, Set<DelegationTokenToRenew>>();
 
+  private ConcurrentMap<Token<?>, DelegationTokenToRenew> allTokens =
+      new ConcurrentHashMap<Token<?>, DelegationTokenToRenew>();
+
   private final ConcurrentMap<ApplicationId, Long> delayedRemovalMap =
       new ConcurrentHashMap<ApplicationId, Long>();
 
@@ -202,6 +204,7 @@ public class DelegationTokenRenewer extends AbstractService 
{
       renewalTimer.cancel();
     }
     appTokens.clear();
+    allTokens.clear();
     this.renewerService.shutdown();
     dtCancelThread.interrupt();
     try {
@@ -230,7 +233,7 @@ public class DelegationTokenRenewer extends AbstractService 
{
     public final Configuration conf;
     public long expirationDate;
     public TimerTask timerTask;
-    public final boolean shouldCancelAtEnd;
+    public volatile boolean shouldCancelAtEnd;
     public long maxDate;
     public String user;
 
@@ -407,12 +410,25 @@ public class DelegationTokenRenewer extends 
AbstractService {
     boolean hasHdfsToken = false;
     for (Token<?> token : tokens) {
       if (token.isManaged()) {
-        tokenList.add(new DelegationTokenToRenew(applicationId,
-            token, getConfig(), now, shouldCancelAtEnd, evt.getUser()));
         if (token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
           LOG.info(applicationId + " found existing hdfs token " + token);
           hasHdfsToken = true;
         }
+
+        DelegationTokenToRenew dttr = allTokens.get(token);
+        if (dttr != null) {
+          // If any of the jobs sharing the same token doesn't want to cancel
+          // the token, we should not cancel the token.
+          if (!evt.shouldCancelAtEnd) {
+            dttr.shouldCancelAtEnd = evt.shouldCancelAtEnd;
+            LOG.info("Set shouldCancelAtEnd=" + shouldCancelAtEnd
+                + " for token " + dttr.token);
+          }
+          continue;
+        }
+
+        tokenList.add(new DelegationTokenToRenew(applicationId, token,
+          getConfig(), now, shouldCancelAtEnd, evt.getUser()));
       }
     }
 
@@ -429,6 +445,7 @@ public class DelegationTokenRenewer extends AbstractService 
{
       }
       for (DelegationTokenToRenew dtr : tokenList) {
         appTokens.get(applicationId).add(dtr);
+        allTokens.put(dtr.token, dtr);
         setTimerForTokenRenewal(dtr);
       }
     }
@@ -496,7 +513,6 @@ public class DelegationTokenRenewer extends AbstractService 
{
     token.setTimerTask(tTask); // keep reference to the timer
 
     renewalTimer.schedule(token.timerTask, new Date(renewIn));
-
     LOG.info("Renew " + token + " in " + expiresIn + " ms, appId = "
         + token.applicationId);
   }
@@ -559,6 +575,10 @@ public class DelegationTokenRenewer extends 
AbstractService {
   private void requestNewHdfsDelegationToken(ApplicationId applicationId,
       String user, boolean shouldCancelAtEnd) throws IOException,
       InterruptedException {
+    if (!hasProxyUserPrivileges) {
+      LOG.info("RM proxy-user privilege is not enabled. Skip requesting hdfs 
tokens.");
+      return;
+    }
     // Get new hdfs tokens for this user
     Credentials credentials = new Credentials();
     Token<?>[] newTokens = obtainSystemTokensForUser(user, credentials);
@@ -621,6 +641,8 @@ public class DelegationTokenRenewer extends AbstractService 
{
     LOG.error("removing failed delegation token for appid=" + applicationId
         + ";t=" + t.token.getService());
     appTokens.get(applicationId).remove(t);
+    allTokens.remove(t.token);
+
     // cancel the timer
     if (t.timerTask != null) {
       t.timerTask.cancel();
@@ -685,9 +707,14 @@ public class DelegationTokenRenewer extends 
AbstractService {
           cancelToken(dttr);
 
           it.remove();
+          allTokens.remove(dttr.token);
         }
       }
     }
+
+    if(tokens != null && tokens.isEmpty()) {
+      appTokens.remove(applicationId);
+    }
   }
 
   /**
@@ -842,4 +869,9 @@ public class DelegationTokenRenewer extends AbstractService 
{
       return appId;
     }
   }
+
+  // only for testing
+  protected ConcurrentMap<Token<?>, DelegationTokenToRenew> getAllTokens() {
+    return allTokens;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0402bada/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 9d0ac27..5794b43 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -313,7 +313,7 @@ public class MockRM extends ResourceManager {
       boolean waitForAccepted, boolean keepContainers) throws Exception {
     return submitApp(masterMemory, name, user, acls, unmanaged, queue,
         maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
-        false, null, 0, null);
+        false, null, 0, null, true);
   }
 
   public RMApp submitApp(int masterMemory, long 
attemptFailuresValidityInterval)
@@ -322,7 +322,7 @@ public class MockRM extends ResourceManager {
       .getShortUserName(), null, false, null,
       super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
       YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
-      false, null, attemptFailuresValidityInterval, null);
+      false, null, attemptFailuresValidityInterval, null, true);
   }
 
   public RMApp submitApp(int masterMemory, String name, String user,
@@ -332,26 +332,24 @@ public class MockRM extends ResourceManager {
       ApplicationId applicationId) throws Exception {
     return submitApp(masterMemory, name, user, acls, unmanaged, queue,
       maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
-      isAppIdProvided, applicationId, 0, null);
+      isAppIdProvided, applicationId, 0, null, true);
   }
 
-  @SuppressWarnings("deprecation")
   public RMApp submitApp(int masterMemory,
       LogAggregationContext logAggregationContext) throws Exception {
     return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser()
       .getShortUserName(), null, false, null,
       super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
       YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
-      false, null, 0, logAggregationContext);
+      false, null, 0, logAggregationContext, true);
    }
 
-  @SuppressWarnings("deprecation")
   public RMApp submitApp(int masterMemory, String name, String user,
       Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
       int maxAppAttempts, Credentials ts, String appType,
       boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
       ApplicationId applicationId, long attemptFailuresValidityInterval,
-      LogAggregationContext logAggregationContext)
+      LogAggregationContext logAggregationContext, boolean 
cancelTokensWhenComplete)
       throws Exception {
     ApplicationId appId = isAppIdProvided ? applicationId : null;
     ApplicationClientProtocol client = getClientRMService();
@@ -392,6 +390,7 @@ public class MockRM extends ResourceManager {
     if (logAggregationContext != null) {
       sub.setLogAggregationContext(logAggregationContext);
     }
+    sub.setCancelTokensWhenComplete(cancelTokensWhenComplete);
     req.setApplicationSubmissionContext(sub);
     UserGroupInformation fakeUser =
       UserGroupInformation.createUserForTesting(user, new String[] 
{"someGroup"});

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0402bada/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
index 7275089..5d31404 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
@@ -79,6 +79,7 @@ import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -86,6 +87,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityM
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.After;
@@ -116,11 +118,12 @@ public class TestDelegationTokenRenewer {
     private static int counter = 0;
     private static Token<?> lastRenewed = null;
     private static Token<?> tokenToRenewIn2Sec = null;
-
+    private static boolean cancelled = false; 
     private static void reset() {
       counter = 0;
       lastRenewed = null;
       tokenToRenewIn2Sec = null;
+
     }
 
     @Override
@@ -136,7 +139,8 @@ public class TestDelegationTokenRenewer {
     @Override
     public long renew(Token<?> t, Configuration conf) throws IOException {
       if ( !(t instanceof MyToken)) {
-        return 
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT;
+        // renew in 3 seconds
+        return System.currentTimeMillis() + 3000;
       }
       MyToken token = (MyToken)t;
       if(token.isCanceled()) {
@@ -158,9 +162,12 @@ public class TestDelegationTokenRenewer {
 
     @Override
     public void cancel(Token<?> t, Configuration conf) {
-      MyToken token = (MyToken)t;
-      LOG.info("Cancel token " + token);
-      token.cancelToken();
+      cancelled = true;
+      if (t instanceof MyToken) {
+        MyToken token = (MyToken) t;
+        LOG.info("Cancel token " + token);
+        token.cancelToken();
+      }
    }
     
   }
@@ -921,6 +928,7 @@ public class TestDelegationTokenRenewer {
   // YARN will get the token for the app submitted without the delegation 
token.
   @Test
   public void testAppSubmissionWithoutDelegationToken() throws Exception {
+    conf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true);
     // create token2
     Text userText2 = new Text("user2");
     DelegationTokenIdentifier dtId2 =
@@ -970,4 +978,48 @@ public class TestDelegationTokenRenewer {
     appCredentials.readTokenStorageStream(buf);
     Assert.assertTrue(appCredentials.getAllTokens().contains(token2));
   }
+
+  // Test submitting an application with the token obtained by a previously
+  // submitted application.
+  @Test (timeout = 30000)
+  public void testAppSubmissionWithPreviousToken() throws Exception{
+    MockRM rm = new TestSecurityMockRM(conf, null);
+    rm.start();
+    final MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
+    nm1.registerNode();
+
+    // create Token1:
+    Text userText1 = new Text("user");
+    DelegationTokenIdentifier dtId1 =
+        new DelegationTokenIdentifier(userText1, new Text("renewer1"),
+          userText1);
+    final Token<DelegationTokenIdentifier> token1 =
+        new Token<DelegationTokenIdentifier>(dtId1.getBytes(),
+          "password1".getBytes(), dtId1.getKind(), new Text("service1"));
+
+    Credentials credentials = new Credentials();
+    credentials.addToken(userText1, token1);
+
+    // submit app1 with a token, set cancelTokenWhenComplete to false;
+    RMApp app1 =
+        rm.submitApp(200, "name", "user", null, false, null, 2, credentials,
+          null, true, false, false, null, 0, null, false);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+    rm.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+
+    // submit app2 with the same token, set cancelTokenWhenComplete to true;
+    RMApp app2 =
+        rm.submitApp(200, "name", "user", null, false, null, 2, credentials,
+          null, true, false, false, null, 0, null, true);
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
+    rm.waitForState(app2.getApplicationId(), RMAppState.RUNNING);
+    MockRM.finishAMAndVerifyAppState(app2, rm, nm1, am2);
+    Assert.assertTrue(rm.getRMContext().getDelegationTokenRenewer()
+      .getAllTokens().containsKey(token1));
+
+    MockRM.finishAMAndVerifyAppState(app1, rm, nm1, am1);
+    // app2 completes, app1 is still running, check the token is not cancelled
+    Assert.assertFalse(Renewer.cancelled);
+  }
 }

Reply via email to