Repository: incubator-slider
Updated Branches:
  refs/heads/feature/SLIDER-149_Support_a_YARN_service_registry c692461b7 -> 
c5fb4f040


SLIDER-304 initial token renewal code for review


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/0bd6c6cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/0bd6c6cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/0bd6c6cf

Branch: refs/heads/feature/SLIDER-149_Support_a_YARN_service_registry
Commit: 0bd6c6cf93b098358c252355e2fbde7e87bd2fe7
Parents: d3e4bf2
Author: Jon Maron <jma...@hortonworks.com>
Authored: Mon Aug 25 18:30:20 2014 -0400
Committer: Jon Maron <jma...@hortonworks.com>
Committed: Mon Aug 25 18:30:20 2014 -0400

----------------------------------------------------------------------
 .../server/appmaster/SliderAppMaster.java       |  88 ++++---
 .../appmaster/actions/RenewingAction.java       |  35 ++-
 .../security/FsDelegationTokenManager.java      | 257 +++++++++++++++++++
 .../TestFsDelegationTokenManager.groovy         | 244 ++++++++++++++++++
 4 files changed, 584 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/0bd6c6cf/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index b4515f1..33ad1df 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -133,6 +133,7 @@ import org.apache.slider.server.appmaster.web.WebAppApiImpl;
 import org.apache.slider.server.appmaster.web.rest.RestPaths;
 import org.apache.slider.server.services.registry.SliderRegistryService;
 import org.apache.slider.server.services.security.CertificateManager;
+import org.apache.slider.server.services.security.FsDelegationTokenManager;
 import org.apache.slider.server.services.utility.AbstractSliderLaunchedService;
 import org.apache.slider.server.services.utility.WebAppService;
 import org.apache.slider.server.services.workflow.ServiceThreadFactory;
@@ -351,6 +352,7 @@ public class SliderAppMaster extends 
AbstractSliderLaunchedService
   private final QueueService actionQueues = new QueueService();
   private String agentOpsUrl;
   private String agentStatusUrl;
+  private FsDelegationTokenManager fsDelegationTokenManager;
 
   /**
    * Service Constructor
@@ -417,8 +419,8 @@ public class SliderAppMaster extends 
AbstractSliderLaunchedService
   @Override
   protected void serviceStart() throws Exception {
     super.serviceStart();
-    executorService.execute(new QueueExecutor(this, actionQueues));
     executorService.execute(actionQueues);
+    executorService.execute(new QueueExecutor(this, actionQueues));
   }
   
   /* =================================================================== */
@@ -583,36 +585,6 @@ public class SliderAppMaster extends 
AbstractSliderLaunchedService
     appInformation.put(StatusKeys.INFO_AM_ATTEMPT_ID,
                        appAttemptID.toString());
 
-    UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
-    Credentials credentials =
-      currentUser.getCredentials();
-    DataOutputBuffer dob = new DataOutputBuffer();
-    credentials.writeTokenStorageToStream(dob);
-    dob.close();
-    // Now remove the AM->RM token so that containers cannot access it.
-    Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
-    while (iter.hasNext()) {
-      Token<?> token = iter.next();
-      log.info("Token {}", token.getKind());
-      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
-        iter.remove();
-      }
-    }
-    allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-
-    // set up secret manager
-    secretManager = new ClientToAMTokenSecretManager(appAttemptID, null);
-
-    // if not a secure cluster, extract the username -it will be
-    // propagated to workers
-    if (!UserGroupInformation.isSecurityEnabled()) {
-      hadoop_user_name = System.getenv(HADOOP_USER_NAME);
-      service_user_name = hadoop_user_name;
-      log.info(HADOOP_USER_NAME + "='{}'", hadoop_user_name);
-    } else {
-      service_user_name = UserGroupInformation.getCurrentUser().getUserName();
-    }
-
     Map<String, String> envVars;
     List<Container> liveContainers;
     /**
@@ -636,6 +608,9 @@ public class SliderAppMaster extends 
AbstractSliderLaunchedService
       nmClientAsync = new NMClientAsyncImpl("nmclient", this);
       deployChildService(nmClientAsync);
 
+      // set up secret manager
+      secretManager = new ClientToAMTokenSecretManager(appAttemptID, null);
+
       //bring up the Slider RPC service
       startSliderRPCServer();
 
@@ -788,12 +763,45 @@ public class SliderAppMaster extends 
AbstractSliderLaunchedService
     sliderAMProvider.bind(stateForProviders, registry, actionQueues,
         liveContainers);
 
-    // now do the registration
-    registerServiceInstance(clustername, appid);
-
     // chaos monkey
     maybeStartMonkey();
 
+    // setup token renewal and expiry handling for long lived apps
+    if (SliderUtils.isHadoopClusterSecure(getConfig())) {
+      fsDelegationTokenManager = new FsDelegationTokenManager(actionQueues);
+      fsDelegationTokenManager.acquireDelegationToken(getConfig());
+    }
+
+    UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+    Credentials credentials =
+        currentUser.getCredentials();
+    DataOutputBuffer dob = new DataOutputBuffer();
+    credentials.writeTokenStorageToStream(dob);
+    dob.close();
+    // Now remove the AM->RM token so that containers cannot access it.
+    Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
+    while (iter.hasNext()) {
+      Token<?> token = iter.next();
+      log.info("Token {}", token.getKind());
+      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+        iter.remove();
+      }
+    }
+    allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+
+    // if not a secure cluster, extract the username -it will be
+    // propagated to workers
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      hadoop_user_name = System.getenv(HADOOP_USER_NAME);
+      service_user_name = hadoop_user_name;
+      log.info(HADOOP_USER_NAME + "='{}'", hadoop_user_name);
+    } else {
+      service_user_name = UserGroupInformation.getCurrentUser().getUserName();
+    }
+
+    // now do the registration
+    registerServiceInstance(clustername, appid);
+
     // Start the Slider AM provider
     sliderAMProvider.start();
 
@@ -1066,6 +1074,14 @@ public class SliderAppMaster extends 
AbstractSliderLaunchedService
       log.debug("Stopped forked process: exit code={}", exitCode);
     }
 
+    if (fsDelegationTokenManager != null) {
+      try {
+        fsDelegationTokenManager.cancelDelegationToken(getConfig());
+      } catch (Exception e) {
+        log.info("Error cancelling HDFS delegation token", e);
+      }
+    }
+
     //stop any launches in progress
     launchService.stop();
 
@@ -1827,7 +1843,9 @@ public class SliderAppMaster extends 
AbstractSliderLaunchedService
     }
     initAndAddService(monkey);
     // and schedule it
-    schedule(monkey.getChaosAction(monkeyInterval, TimeUnit.SECONDS));
+    if (enabled) {
+      schedule(monkey.getChaosAction(monkeyInterval, TimeUnit.SECONDS));
+    }
     return true;
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/0bd6c6cf/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java
index c62582f..40c1021 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java
@@ -26,6 +26,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * This action executes then reschedules an inner action; a limit
@@ -36,9 +38,12 @@ public class RenewingAction<A extends AsyncAction> extends 
AsyncAction {
   private static final Logger log =
       LoggerFactory.getLogger(RenewingAction.class);
   private final A action;
-  private final long interval;
-  private final TimeUnit timeUnit;
+  private long interval;
+  private TimeUnit timeUnit;
   public final AtomicInteger executionCount = new AtomicInteger();
+  private final ReentrantReadWriteLock intervalLock = new 
ReentrantReadWriteLock();
+  private final Lock intervalReadLock = intervalLock.readLock();
+  private final Lock intervalWriteLock = intervalLock.writeLock();
   public final int limit;
 
 
@@ -85,7 +90,7 @@ public class RenewingAction<A extends AsyncAction> extends 
AsyncAction {
       reschedule = limit > exCount;
     }
     if (reschedule) {
-      this.setNanos(convertAndOffset(interval, timeUnit));
+      this.setNanos(convertAndOffset(getInterval(), getTimeUnit()));
       log.debug("{}: rescheduling, new offset {} mS ", this,
           getDelay(TimeUnit.MILLISECONDS));
       queueService.schedule(this);
@@ -101,11 +106,31 @@ public class RenewingAction<A extends AsyncAction> 
extends AsyncAction {
   }
 
   public long getInterval() {
-    return interval;
+    intervalReadLock.lock();
+    try {
+      return interval;
+    } finally {
+      intervalReadLock.unlock();
+    }
+  }
+
+  public void updateInterval(long delay, TimeUnit timeUnit) {
+    intervalWriteLock.lock();
+    try {
+      interval = delay;
+      this.timeUnit = timeUnit;
+    } finally {
+      intervalWriteLock.unlock();
+    }
   }
 
   public TimeUnit getTimeUnit() {
-    return timeUnit;
+    intervalReadLock.lock();
+    try {
+      return timeUnit;
+    } finally {
+      intervalReadLock.unlock();
+    }
   }
 
   public int getExecutionCount() {

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/0bd6c6cf/slider-core/src/main/java/org/apache/slider/server/services/security/FsDelegationTokenManager.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/services/security/FsDelegationTokenManager.java
 
b/slider-core/src/main/java/org/apache/slider/server/services/security/FsDelegationTokenManager.java
new file mode 100644
index 0000000..8f0de3d
--- /dev/null
+++ 
b/slider-core/src/main/java/org/apache/slider/server/services/security/FsDelegationTokenManager.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.slider.server.services.security;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Time;
+import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.server.appmaster.SliderAppMaster;
+import org.apache.slider.server.appmaster.actions.AsyncAction;
+import org.apache.slider.server.appmaster.actions.QueueAccess;
+import org.apache.slider.server.appmaster.actions.RenewingAction;
+import org.apache.slider.server.appmaster.state.AppState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class FsDelegationTokenManager {
+  private final QueueAccess queue;
+  private RenewingAction<RenewAction> renewingAction;
+  private UserGroupInformation remoteUser;
+  private UserGroupInformation currentUser;
+  private static final Logger
+      log = LoggerFactory.getLogger(FsDelegationTokenManager.class);
+  private long renewInterval;
+  private RenewAction renewAction;
+  private String tokenName;
+
+  public FsDelegationTokenManager(QueueAccess queue) throws IOException {
+    this.queue = queue;
+    this.currentUser = UserGroupInformation.getCurrentUser();
+  }
+
+  private void createRemoteUser(Configuration configuration) throws 
IOException {
+    Configuration loginConfig = new Configuration(configuration);
+    loginConfig.set(DFSConfigKeys.HADOOP_SECURITY_AUTHENTICATION,
+                    "kerberos");
+    // using HDFS principal...
+    this.remoteUser = UserGroupInformation
+        .loginUserFromKeytabAndReturnUGI(
+            loginConfig.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
+            loginConfig.get(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY));
+    log.info("Created remote user {}.  UGI reports current user is {}",
+             this.remoteUser, UserGroupInformation.getCurrentUser());
+  }
+
+  public void acquireDelegationToken(Configuration configuration)
+      throws IOException, InterruptedException {
+    if (remoteUser == null) {
+      createRemoteUser(configuration);
+    }
+    if (SliderUtils.isHadoopClusterSecure(configuration) &&
+        renewingAction == null) {
+      renewInterval = configuration.getLong(
+          DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
+          DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
+      // constructor of action will retrieve initial token.  One may already be
+      // associated with user, but its lifecycle/management is not clear so 
let's
+      // create and manage a token explicitly
+      renewAction = new RenewAction("HDFS renew",
+                                           configuration);
+      // set retrieved token as the user associated delegation token and
+      // start a renewing action to renew
+      Token<?> token = renewAction.getToken();
+      currentUser.addToken(token.getService(), token);
+      log.info("HDFS delegation token {} acquired and set as credential for 
current user", token);
+      renewingAction = new RenewingAction<RenewAction>(renewAction,
+                                          (int) renewInterval,
+                                          (int) renewInterval,
+                                          TimeUnit.MILLISECONDS,
+                                          getRenewingLimit());
+      log.info("queuing HDFS delegation token renewal interval of {} 
milliseconds",
+               renewInterval);
+      queue(renewingAction);
+    }
+  }
+
+  public void cancelDelegationToken(Configuration configuration)
+      throws IOException, InterruptedException {
+    queue.removeRenewingAction(getRenewingActionName());
+    if (renewAction != null) {
+      renewAction.getToken().cancel(configuration);
+    }
+    log.info("Renewing action {} removed and HDFS delegation token renewal "
+             + "cancelled", getRenewingActionName());
+  }
+
+  protected int getRenewingLimit() {
+    return 0;
+  }
+
+  protected void queue(RenewingAction<RenewAction> action) {
+    queue.renewing(getRenewingActionName(),
+                   action);
+  }
+
+  protected String getRenewingActionName() {
+    if (tokenName == null) {
+      tokenName = "HDFS renewing token " + UUID.randomUUID();
+    }
+    return tokenName;
+  }
+
+  class RenewAction extends AsyncAction {
+    Configuration configuration;
+    Token<?> token;
+    private final FileSystem fs;
+
+    RenewAction(String name,
+                Configuration configuration)
+        throws IOException, InterruptedException {
+      super(name);
+      this.configuration = configuration;
+      fs = getFileSystem();
+      // get initial token by creating a kerberos authenticated user and
+      // invoking token methods as that user
+      synchronized (fs) {
+        this.token = remoteUser.doAs(new PrivilegedExceptionAction<Token<?>>() 
{
+          @Override
+          public Token<?> run() throws Exception {
+            log.info("Obtaining HDFS delgation token with user {}",
+                     remoteUser.getShortUserName());
+            return fs.getDelegationToken(
+                remoteUser.getShortUserName());
+          }
+        });
+      }
+      log.info("Initial request returned delegation token {}", token);
+    }
+
+    protected FileSystem getFileSystem()
+        throws IOException, InterruptedException {
+      // return non-cache FS reference
+      return remoteUser.doAs(new PrivilegedExceptionAction<FileSystem>() {
+        @Override
+        public FileSystem run() throws Exception {
+          Configuration config = new Configuration(configuration);
+          config.setBoolean("fs.hdfs.impl.disable.cache", true);
+          return getRemoteFileSystemForRenewal(config);
+        }
+      });
+    }
+
+    @Override
+    public void execute(SliderAppMaster appMaster, QueueAccess queueService,
+                        AppState appState)
+        throws Exception {
+      if (fs != null) {
+        synchronized(fs) {
+          try {
+            long expires = remoteUser.doAs(new 
PrivilegedExceptionAction<Long>() {
+              @Override
+              public Long run() throws Exception {
+                long expires = token.renew(fs.getConf());
+                log.info("HDFS delegation token renewed");
+                return expires;
+              }
+            });
+            long calculatedInterval = expires - Time.now();
+            if ( calculatedInterval < renewInterval ) {
+              // time to get a new token since the token will expire before
+              // next renewal interval.  Could modify this to be closer to 
expiry
+              // time if deemed necessary....
+              log.info("Interval of {} less than renew interval.  Getting new 
token",
+                       calculatedInterval);
+              getNewToken();
+            } else {
+              updateRenewalTime(renewInterval);
+            }
+          } catch (IOException ie) {
+            // token has expired.  get a new one...
+            log.info("Exception raised by renew", ie);
+            getNewToken();
+          }
+        }
+      }
+    }
+
+    private void getNewToken()
+        throws InterruptedException, IOException {
+      try {
+        Text service = token.getService();
+        Token<?>[] tokens = remoteUser.doAs(new 
PrivilegedExceptionAction<Token<?>[]>() {
+            @Override
+            public Token<?>[] run() throws Exception {
+              return fs.addDelegationTokens(remoteUser.getShortUserName(), 
null);
+            }
+        });
+        if (tokens.length == 0) {
+          throw new IOException("addDelegationTokens returned no tokens");
+        }
+        token = findMatchingToken(service, tokens);
+        currentUser.addToken(token.getService(), token);
+
+        log.info("Expired HDFS delegation token replaced and added as 
credential to current user");
+        updateRenewalTime(renewInterval);
+      } catch (IOException ie2) {
+        throw new IOException("Can't get new delegation token ", ie2);
+      }
+    }
+
+    private void updateRenewalTime(long interval) {
+      long delay = interval - interval/10;
+      renewingAction.updateInterval(delay, TimeUnit.MILLISECONDS);
+      log.info("Token renewal set for {} ms from now", delay);
+    }
+
+    private Token<?> findMatchingToken(Text service, Token<?>[] tokens) {
+      Token<?> token = null;
+      int i = 0;
+      while (token == null && i < tokens.length) {
+        if (tokens[i].getService().equals(service)) {
+          token = tokens[i];
+        }
+        i++;
+      }
+
+      return token;
+    }
+
+    Token<?> getToken() {
+      synchronized (fs) {
+        return token;
+      }
+    }
+  }
+
+  protected FileSystem getRemoteFileSystemForRenewal(Configuration config)
+      throws IOException {
+    return FileSystem.get(config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/0bd6c6cf/slider-core/src/test/groovy/org/apache/slider/server/services/security/TestFsDelegationTokenManager.groovy
----------------------------------------------------------------------
diff --git 
a/slider-core/src/test/groovy/org/apache/slider/server/services/security/TestFsDelegationTokenManager.groovy
 
b/slider-core/src/test/groovy/org/apache/slider/server/services/security/TestFsDelegationTokenManager.groovy
new file mode 100644
index 0000000..245c09a
--- /dev/null
+++ 
b/slider-core/src/test/groovy/org/apache/slider/server/services/security/TestFsDelegationTokenManager.groovy
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.security
+
+import groovy.util.logging.Slf4j
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem as HadoopFS
+import org.apache.hadoop.fs.RawLocalFileSystem
+import org.apache.hadoop.hdfs.DFSConfigKeys
+import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.token.SecretManager
+import org.apache.hadoop.security.token.Token
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager
+import org.apache.hadoop.service.ServiceOperations
+import org.apache.hadoop.util.Time
+import org.apache.slider.common.tools.CoreFileSystem
+import org.apache.slider.server.appmaster.actions.ActionStopQueue
+import org.apache.slider.server.appmaster.actions.QueueExecutor
+import org.apache.slider.server.appmaster.actions.QueueService
+import org.junit.After
+import org.junit.Before
+import org.junit.Test
+
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.AtomicLong
+
+@Slf4j
+//@CompileStatic
+class TestFsDelegationTokenManager {
+
+  QueueService queues;
+  FsDelegationTokenManager tokenManager;
+  Configuration conf;
+  UserGroupInformation currentUser;
+
+
+  @Before
+  void setup() {
+    queues = new QueueService();
+
+    conf = new Configuration()
+    conf.set(
+            DFSConfigKeys.HADOOP_SECURITY_AUTHENTICATION,
+            "TOKEN")
+    conf.setLong(
+            DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
+            1000)
+    queues.init(conf)
+    queues.start();
+
+    HadoopFS fs = new TestFileSystem()
+
+    CoreFileSystem coreFileSystem = new CoreFileSystem(fs, conf)
+
+    String[] groups = new String[1];
+    groups[0] = 'testGroup1'
+
+    currentUser = UserGroupInformation.createUserForTesting("test", groups)
+    UserGroupInformation.setLoginUser(currentUser)
+
+    tokenManager = new FsDelegationTokenManager(queues) {
+        @Override
+        protected int getRenewingLimit() {
+            return 5
+        }
+
+        @Override
+        protected org.apache.hadoop.fs.FileSystem 
getRemoteFileSystemForRenewal(Configuration config) throws IOException {
+            return new TestFileSystem();
+        }
+
+        @Override
+        protected String getRenewingActionName() {
+            return "TEST RENEW"
+        }
+    }
+
+  }
+
+  public static class DummySecretManager extends
+          AbstractDelegationTokenSecretManager<DelegationTokenIdentifier> {
+
+    public DummySecretManager(long delegationKeyUpdateInterval,
+                              long delegationTokenMaxLifetime, long 
delegationTokenRenewInterval,
+                              long delegationTokenRemoverScanInterval) {
+      super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
+            delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
+    }
+
+    @Override
+    public DelegationTokenIdentifier createIdentifier() {
+      return null;
+    }
+
+    @Override
+    public byte[] createPassword(DelegationTokenIdentifier dtId) {
+      return new byte[1];
+    }
+  }
+
+  public class TestFileSystem extends RawLocalFileSystem {
+      int sequenceNum = 0;
+      SecretManager<DelegationTokenIdentifier> mgr =
+          new DummySecretManager(0, 0, 0, 0);
+      @Override
+      Token<DelegationTokenIdentifier> getDelegationToken(String renewer) 
throws IOException {
+        return new TestToken(getIdentifier(), mgr);
+      }
+
+      @Override
+      Token<?>[] addDelegationTokens(String renewer, Credentials credentials) 
throws IOException {
+          Token[] tokens = new Token[1]
+          tokens[0] = new TestToken(getIdentifier(), mgr)
+          return tokens
+      }
+
+      private DelegationTokenIdentifier getIdentifier() {
+          def user = new Text(currentUser.getUserName())
+          def id = new DelegationTokenIdentifier(user, user, user)
+          id.setSequenceNumber(sequenceNum++)
+
+          return id
+      }
+  }
+
+  public class TestToken extends Token<DelegationTokenIdentifier> {
+      static long maxCount = 0;
+      private final AtomicLong renewCount = new AtomicLong()
+      private final AtomicLong totalCount = new AtomicLong()
+      public final AtomicBoolean expired = new AtomicBoolean(false);
+      public final AtomicBoolean cancelled = new AtomicBoolean(false);
+
+      TestToken(DelegationTokenIdentifier id, 
SecretManager<DelegationTokenIdentifier> mgr) {
+          super(id, mgr)
+      }
+
+      @Override
+      Text getService() {
+          return new Text("HDFS")
+      }
+
+      @Override
+      long renew(Configuration conf) throws IOException, InterruptedException {
+          totalCount.getAndIncrement();
+          if (maxCount > 0 && renewCount.getAndIncrement() > maxCount) {
+              renewCount.set(0L)
+              expired.set(true)
+              throw new IOException("Expired")
+          }
+
+
+          return Time.now() + 10000;
+      }
+
+      @Override
+      void cancel(Configuration conf) throws IOException, InterruptedException 
{
+        cancelled.set(true)
+      }
+  }
+
+  @After
+  void destroyService() {
+    ServiceOperations.stop(queues);
+  }
+
+  public void runQueuesToCompletion() {
+    new Thread(queues).start();
+    QueueExecutor ex = new QueueExecutor(queues)
+    ex.run();
+  }
+
+  public void runQueuesButNotToCompletion() {
+    new Thread(queues).start();
+    QueueExecutor ex = new QueueExecutor(queues)
+    new Thread(ex).start();
+    Thread.sleep(1000)
+    tokenManager.cancelDelegationToken(conf)
+  }
+
+  @Test
+  public void testRenew() throws Throwable {
+    tokenManager.acquireDelegationToken(conf)
+    def stop = new ActionStopQueue(10, TimeUnit.SECONDS)
+    queues.schedule(stop);
+    runQueuesToCompletion()
+
+    TestToken token = (TestToken) currentUser.getTokens()[0]
+    assert token.totalCount.get() > 4
+  }
+
+  @Test
+  public void testCancel() throws Throwable {
+    tokenManager.acquireDelegationToken(conf)
+    def stop = new ActionStopQueue(10, TimeUnit.SECONDS)
+    queues.schedule(stop);
+    runQueuesButNotToCompletion()
+
+    TestToken token = (TestToken) currentUser.getTokens()[0]
+    assert token.cancelled.get()
+    assert queues.lookupRenewingAction("TEST RENEW") == null
+  }
+
+
+    @Test
+  public void testRenewPastExpiry() throws Throwable {
+    try {
+      TestToken.maxCount = 3L
+      tokenManager.acquireDelegationToken(conf)
+      TestToken origToken = currentUser.getTokens()[0]
+      def stop = new ActionStopQueue(10, TimeUnit.SECONDS)
+      queues.schedule(stop);
+      runQueuesToCompletion()
+
+      TestToken token = (TestToken) currentUser.getTokens()[0]
+      assert token != null
+      assert token != origToken
+      assert origToken.getService().equals(token.getService())
+      assert origToken.totalCount.get() > 4
+      assert origToken.expired.get()
+    } finally {
+      TestToken.maxCount = 0
+    }
+  }
+}

Reply via email to