YARN-4041. Slow delegation token renewal can severely prolong RM recovery. 
Contributed by Sunil G


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d3a34a4f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d3a34a4f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d3a34a4f

Branch: refs/heads/HDFS-8966
Commit: d3a34a4f388155f6a7ef040e244ce7be788cd28b
Parents: 533a2be
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Oct 23 20:57:01 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Oct 23 20:57:01 2015 +0000

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../server/resourcemanager/rmapp/RMAppImpl.java | 14 ++--
 .../security/DelegationTokenRenewer.java        | 69 ++++++++++++++++++--
 .../server/resourcemanager/TestRMRestart.java   | 24 +++----
 4 files changed, 86 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3a34a4f/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 125ff94..7e30ac9 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -1076,6 +1076,9 @@ Release 2.7.2 - UNRELEASED
     YARN-4209. RMStateStore FENCED state doesn’t work due to 
updateFencedState called 
     by stateMachine.doTransition. (Zhihai Xu via rohithsharmaks)
 
+    YARN-4041. Slow delegation token renewal can severely prolong RM recovery
+    (Sunil G via jlowe)
+
 Release 2.7.1 - 2015-07-06
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3a34a4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 43a3a51..41254d8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -946,14 +946,16 @@ public class RMAppImpl implements RMApp, Recoverable {
       }
 
       if (UserGroupInformation.isSecurityEnabled()) {
-        // synchronously renew delegation token on recovery.
+        // asynchronously renew delegation token on recovery.
         try {
-          app.rmContext.getDelegationTokenRenewer().addApplicationSync(
-            app.getApplicationId(), app.parseCredentials(),
-            app.submissionContext.getCancelTokensWhenComplete(), 
app.getUser());
+          app.rmContext.getDelegationTokenRenewer()
+              .addApplicationAsyncDuringRecovery(app.getApplicationId(),
+                  app.parseCredentials(),
+                  app.submissionContext.getCancelTokensWhenComplete(),
+                  app.getUser());
         } catch (Exception e) {
-          String msg = "Failed to renew token for " + app.applicationId
-                  + " on recovery : " + e.getMessage();
+          String msg = "Failed to fetch user credentials from application:"
+              + e.getMessage();
           app.diagnostics.append(msg);
           LOG.error(msg, e);
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3a34a4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
index 426e460..cca14e9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
@@ -388,6 +388,25 @@ public class DelegationTokenRenewer extends 
AbstractService {
   }
 
   /**
+   * Asynchronously add application tokens for renewal.
+   *
+   * @param applicationId
+   *          added application
+   * @param ts
+   *          tokens
+   * @param shouldCancelAtEnd
+   *          true if tokens should be canceled when the app is done else 
false.
+   * @param user
+   *          user
+   */
+  public void addApplicationAsyncDuringRecovery(ApplicationId applicationId,
+      Credentials ts, boolean shouldCancelAtEnd, String user) {
+    processDelegationTokenRenewerEvent(
+        new DelegationTokenRenewerAppRecoverEvent(applicationId, ts,
+            shouldCancelAtEnd, user));
+  }
+
+  /**
    * Synchronously renew delegation tokens.
    * @param user user
    */
@@ -398,7 +417,7 @@ public class DelegationTokenRenewer extends AbstractService 
{
       applicationId, ts, shouldCancelAtEnd, user));
   }
 
-  private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt)
+  private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt)
       throws IOException, InterruptedException {
     ApplicationId applicationId = evt.getApplicationId();
     Credentials ts = evt.getCredentials();
@@ -842,6 +861,10 @@ public class DelegationTokenRenewer extends 
AbstractService {
         DelegationTokenRenewerAppSubmitEvent appSubmitEvt =
             (DelegationTokenRenewerAppSubmitEvent) evt;
         handleDTRenewerAppSubmitEvent(appSubmitEvt);
+      } else if (evt instanceof DelegationTokenRenewerAppRecoverEvent) {
+        DelegationTokenRenewerAppRecoverEvent appRecoverEvt =
+            (DelegationTokenRenewerAppRecoverEvent) evt;
+        handleDTRenewerAppRecoverEvent(appRecoverEvt);
       } else if (evt.getType().equals(
           DelegationTokenRenewerEventType.FINISH_APPLICATION)) {
         DelegationTokenRenewer.this.handleAppFinishEvent(evt);
@@ -876,17 +899,50 @@ public class DelegationTokenRenewer extends 
AbstractService {
       }
     }
   }
-  
-  static class DelegationTokenRenewerAppSubmitEvent extends
+
+  @SuppressWarnings("unchecked")
+  private void handleDTRenewerAppRecoverEvent(
+      DelegationTokenRenewerAppRecoverEvent event) {
+    try {
+      // Setup tokens for renewal during recovery
+      DelegationTokenRenewer.this.handleAppSubmitEvent(event);
+    } catch (Throwable t) {
+      LOG.warn(
+          "Unable to add the application to the delegation token renewer.", t);
+    }
+  }
+
+  static class DelegationTokenRenewerAppSubmitEvent
+      extends
+        AbstractDelegationTokenRenewerAppEvent {
+    public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId,
+        Credentials credentails, boolean shouldCancelAtEnd, String user) {
+      super(appId, credentails, shouldCancelAtEnd, user,
+          DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION);
+    }
+  }
+
+  static class DelegationTokenRenewerAppRecoverEvent
+      extends
+        AbstractDelegationTokenRenewerAppEvent {
+    public DelegationTokenRenewerAppRecoverEvent(ApplicationId appId,
+        Credentials credentails, boolean shouldCancelAtEnd, String user) {
+      super(appId, credentails, shouldCancelAtEnd, user,
+          DelegationTokenRenewerEventType.RECOVER_APPLICATION);
+    }
+  }
+
+  static class AbstractDelegationTokenRenewerAppEvent extends
       DelegationTokenRenewerEvent {
 
     private Credentials credentials;
     private boolean shouldCancelAtEnd;
     private String user;
 
-    public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId,
-        Credentials credentails, boolean shouldCancelAtEnd, String user) {
-      super(appId, 
DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION);
+    public AbstractDelegationTokenRenewerAppEvent(ApplicationId appId,
+        Credentials credentails, boolean shouldCancelAtEnd, String user,
+        DelegationTokenRenewerEventType type) {
+      super(appId, type);
       this.credentials = credentails;
       this.shouldCancelAtEnd = shouldCancelAtEnd;
       this.user = user;
@@ -907,6 +963,7 @@ public class DelegationTokenRenewer extends AbstractService 
{
   
   enum DelegationTokenRenewerEventType {
     VERIFY_AND_START_APPLICATION,
+    RECOVER_APPLICATION,
     FINISH_APPLICATION
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3a34a4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index 531a4a9..cd84208 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -1179,24 +1179,24 @@ public class TestRMRestart extends 
ParameterizedSchedulerTestBase {
 
     // Need to wait for a while as now token renewal happens on another thread
     // and is asynchronous in nature.
-    waitForTokensToBeRenewed(rm2);
+    waitForTokensToBeRenewed(rm2, tokenSet);
 
     // verify tokens are properly populated back to rm2 DelegationTokenRenewer
     Assert.assertEquals(tokenSet, rm2.getRMContext()
       .getDelegationTokenRenewer().getDelegationTokens());
   }
 
-  private void waitForTokensToBeRenewed(MockRM rm2) throws Exception {
-    int waitCnt = 20;
-    boolean atleastOneAppInNEWState = true;
-    while (waitCnt-- > 0 && atleastOneAppInNEWState) {
-      atleastOneAppInNEWState = false;
-      for (RMApp rmApp : rm2.getRMContext().getRMApps().values()) {
-        if (rmApp.getState() == RMAppState.NEW) {
-          Thread.sleep(1000);
-          atleastOneAppInNEWState = true;
-          break;
-        }
+  private void waitForTokensToBeRenewed(MockRM rm2,
+      HashSet<Token<RMDelegationTokenIdentifier>> tokenSet) throws Exception {
+    // Max wait time to get the token renewal can be kept as 1sec (100 * 10ms)
+    int waitCnt = 100;
+    while (waitCnt-- > 0) {
+      if (tokenSet.equals(rm2.getRMContext().getDelegationTokenRenewer()
+          .getDelegationTokens())) {
+        // Stop waiting as tokens are populated to DelegationTokenRenewer.
+        break;
+      } else {
+        Thread.sleep(10);
       }
     }
   }

Reply via email to