Repository: incubator-slider Updated Branches: refs/heads/feature/SLIDER-304_hdfs_token_renewal [created] 0bd6c6cf9
SLIDER-304 initial token renewal code for review Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/0bd6c6cf Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/0bd6c6cf Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/0bd6c6cf Branch: refs/heads/feature/SLIDER-304_hdfs_token_renewal Commit: 0bd6c6cf93b098358c252355e2fbde7e87bd2fe7 Parents: d3e4bf2 Author: Jon Maron <jma...@hortonworks.com> Authored: Mon Aug 25 18:30:20 2014 -0400 Committer: Jon Maron <jma...@hortonworks.com> Committed: Mon Aug 25 18:30:20 2014 -0400 ---------------------------------------------------------------------- .../server/appmaster/SliderAppMaster.java | 88 ++++--- .../appmaster/actions/RenewingAction.java | 35 ++- .../security/FsDelegationTokenManager.java | 257 +++++++++++++++++++ .../TestFsDelegationTokenManager.groovy | 244 ++++++++++++++++++ 4 files changed, 584 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/0bd6c6cf/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java index b4515f1..33ad1df 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java @@ -133,6 +133,7 @@ import org.apache.slider.server.appmaster.web.WebAppApiImpl; import org.apache.slider.server.appmaster.web.rest.RestPaths; import org.apache.slider.server.services.registry.SliderRegistryService; import org.apache.slider.server.services.security.CertificateManager; +import org.apache.slider.server.services.security.FsDelegationTokenManager; import org.apache.slider.server.services.utility.AbstractSliderLaunchedService; import org.apache.slider.server.services.utility.WebAppService; import org.apache.slider.server.services.workflow.ServiceThreadFactory; @@ -351,6 +352,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService private final QueueService actionQueues = new QueueService(); private String agentOpsUrl; private String agentStatusUrl; + private FsDelegationTokenManager fsDelegationTokenManager; /** * Service Constructor @@ -417,8 +419,8 @@ public class SliderAppMaster extends AbstractSliderLaunchedService @Override protected void serviceStart() throws Exception { super.serviceStart(); - executorService.execute(new QueueExecutor(this, actionQueues)); executorService.execute(actionQueues); + executorService.execute(new QueueExecutor(this, actionQueues)); } /* =================================================================== */ @@ -583,36 +585,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService appInformation.put(StatusKeys.INFO_AM_ATTEMPT_ID, appAttemptID.toString()); - UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); - Credentials credentials = - currentUser.getCredentials(); - DataOutputBuffer dob = new DataOutputBuffer(); - credentials.writeTokenStorageToStream(dob); - dob.close(); - // Now remove the AM->RM token so that containers cannot access it. - Iterator<Token<?>> iter = credentials.getAllTokens().iterator(); - while (iter.hasNext()) { - Token<?> token = iter.next(); - log.info("Token {}", token.getKind()); - if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { - iter.remove(); - } - } - allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); - - // set up secret manager - secretManager = new ClientToAMTokenSecretManager(appAttemptID, null); - - // if not a secure cluster, extract the username -it will be - // propagated to workers - if (!UserGroupInformation.isSecurityEnabled()) { - hadoop_user_name = System.getenv(HADOOP_USER_NAME); - service_user_name = hadoop_user_name; - log.info(HADOOP_USER_NAME + "='{}'", hadoop_user_name); - } else { - service_user_name = UserGroupInformation.getCurrentUser().getUserName(); - } - Map<String, String> envVars; List<Container> liveContainers; /** @@ -636,6 +608,9 @@ public class SliderAppMaster extends AbstractSliderLaunchedService nmClientAsync = new NMClientAsyncImpl("nmclient", this); deployChildService(nmClientAsync); + // set up secret manager + secretManager = new ClientToAMTokenSecretManager(appAttemptID, null); + //bring up the Slider RPC service startSliderRPCServer(); @@ -788,12 +763,45 @@ public class SliderAppMaster extends AbstractSliderLaunchedService sliderAMProvider.bind(stateForProviders, registry, actionQueues, liveContainers); - // now do the registration - registerServiceInstance(clustername, appid); - // chaos monkey maybeStartMonkey(); + // setup token renewal and expiry handling for long lived apps + if (SliderUtils.isHadoopClusterSecure(getConfig())) { + fsDelegationTokenManager = new FsDelegationTokenManager(actionQueues); + fsDelegationTokenManager.acquireDelegationToken(getConfig()); + } + + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + Credentials credentials = + currentUser.getCredentials(); + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + dob.close(); + // Now remove the AM->RM token so that containers cannot access it. + Iterator<Token<?>> iter = credentials.getAllTokens().iterator(); + while (iter.hasNext()) { + Token<?> token = iter.next(); + log.info("Token {}", token.getKind()); + if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { + iter.remove(); + } + } + allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + + // if not a secure cluster, extract the username -it will be + // propagated to workers + if (!UserGroupInformation.isSecurityEnabled()) { + hadoop_user_name = System.getenv(HADOOP_USER_NAME); + service_user_name = hadoop_user_name; + log.info(HADOOP_USER_NAME + "='{}'", hadoop_user_name); + } else { + service_user_name = UserGroupInformation.getCurrentUser().getUserName(); + } + + // now do the registration + registerServiceInstance(clustername, appid); + // Start the Slider AM provider sliderAMProvider.start(); @@ -1066,6 +1074,14 @@ public class SliderAppMaster extends AbstractSliderLaunchedService log.debug("Stopped forked process: exit code={}", exitCode); } + if (fsDelegationTokenManager != null) { + try { + fsDelegationTokenManager.cancelDelegationToken(getConfig()); + } catch (Exception e) { + log.info("Error cancelling HDFS delegation token", e); + } + } + //stop any launches in progress launchService.stop(); @@ -1827,7 +1843,9 @@ public class SliderAppMaster extends AbstractSliderLaunchedService } initAndAddService(monkey); // and schedule it - schedule(monkey.getChaosAction(monkeyInterval, TimeUnit.SECONDS)); + if (enabled) { + schedule(monkey.getChaosAction(monkeyInterval, TimeUnit.SECONDS)); + } return true; } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/0bd6c6cf/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java index c62582f..40c1021 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java @@ -26,6 +26,8 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * This action executes then reschedules an inner action; a limit @@ -36,9 +38,12 @@ public class RenewingAction<A extends AsyncAction> extends AsyncAction { private static final Logger log = LoggerFactory.getLogger(RenewingAction.class); private final A action; - private final long interval; - private final TimeUnit timeUnit; + private long interval; + private TimeUnit timeUnit; public final AtomicInteger executionCount = new AtomicInteger(); + private final ReentrantReadWriteLock intervalLock = new ReentrantReadWriteLock(); + private final Lock intervalReadLock = intervalLock.readLock(); + private final Lock intervalWriteLock = intervalLock.writeLock(); public final int limit; @@ -85,7 +90,7 @@ public class RenewingAction<A extends AsyncAction> extends AsyncAction { reschedule = limit > exCount; } if (reschedule) { - this.setNanos(convertAndOffset(interval, timeUnit)); + this.setNanos(convertAndOffset(getInterval(), getTimeUnit())); log.debug("{}: rescheduling, new offset {} mS ", this, getDelay(TimeUnit.MILLISECONDS)); queueService.schedule(this); @@ -101,11 +106,31 @@ public class RenewingAction<A extends AsyncAction> extends AsyncAction { } public long getInterval() { - return interval; + intervalReadLock.lock(); + try { + return interval; + } finally { + intervalReadLock.unlock(); + } + } + + public void updateInterval(long delay, TimeUnit timeUnit) { + intervalWriteLock.lock(); + try { + interval = delay; + this.timeUnit = timeUnit; + } finally { + intervalWriteLock.unlock(); + } } public TimeUnit getTimeUnit() { - return timeUnit; + intervalReadLock.lock(); + try { + return timeUnit; + } finally { + intervalReadLock.unlock(); + } } public int getExecutionCount() { http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/0bd6c6cf/slider-core/src/main/java/org/apache/slider/server/services/security/FsDelegationTokenManager.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/services/security/FsDelegationTokenManager.java b/slider-core/src/main/java/org/apache/slider/server/services/security/FsDelegationTokenManager.java new file mode 100644 index 0000000..8f0de3d --- /dev/null +++ b/slider-core/src/main/java/org/apache/slider/server/services/security/FsDelegationTokenManager.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.slider.server.services.security; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.Time; +import org.apache.slider.common.tools.SliderUtils; +import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.actions.AsyncAction; +import org.apache.slider.server.appmaster.actions.QueueAccess; +import org.apache.slider.server.appmaster.actions.RenewingAction; +import org.apache.slider.server.appmaster.state.AppState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +/** + * + */ +public class FsDelegationTokenManager { + private final QueueAccess queue; + private RenewingAction<RenewAction> renewingAction; + private UserGroupInformation remoteUser; + private UserGroupInformation currentUser; + private static final Logger + log = LoggerFactory.getLogger(FsDelegationTokenManager.class); + private long renewInterval; + private RenewAction renewAction; + private String tokenName; + + public FsDelegationTokenManager(QueueAccess queue) throws IOException { + this.queue = queue; + this.currentUser = UserGroupInformation.getCurrentUser(); + } + + private void createRemoteUser(Configuration configuration) throws IOException { + Configuration loginConfig = new Configuration(configuration); + loginConfig.set(DFSConfigKeys.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + // using HDFS principal... + this.remoteUser = UserGroupInformation + .loginUserFromKeytabAndReturnUGI( + loginConfig.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), + loginConfig.get(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY)); + log.info("Created remote user {}. UGI reports current user is {}", + this.remoteUser, UserGroupInformation.getCurrentUser()); + } + + public void acquireDelegationToken(Configuration configuration) + throws IOException, InterruptedException { + if (remoteUser == null) { + createRemoteUser(configuration); + } + if (SliderUtils.isHadoopClusterSecure(configuration) && + renewingAction == null) { + renewInterval = configuration.getLong( + DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, + DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); + // constructor of action will retrieve initial token. One may already be + // associated with user, but its lifecycle/management is not clear so let's + // create and manage a token explicitly + renewAction = new RenewAction("HDFS renew", + configuration); + // set retrieved token as the user associated delegation token and + // start a renewing action to renew + Token<?> token = renewAction.getToken(); + currentUser.addToken(token.getService(), token); + log.info("HDFS delegation token {} acquired and set as credential for current user", token); + renewingAction = new RenewingAction<RenewAction>(renewAction, + (int) renewInterval, + (int) renewInterval, + TimeUnit.MILLISECONDS, + getRenewingLimit()); + log.info("queuing HDFS delegation token renewal interval of {} milliseconds", + renewInterval); + queue(renewingAction); + } + } + + public void cancelDelegationToken(Configuration configuration) + throws IOException, InterruptedException { + queue.removeRenewingAction(getRenewingActionName()); + if (renewAction != null) { + renewAction.getToken().cancel(configuration); + } + log.info("Renewing action {} removed and HDFS delegation token renewal " + + "cancelled", getRenewingActionName()); + } + + protected int getRenewingLimit() { + return 0; + } + + protected void queue(RenewingAction<RenewAction> action) { + queue.renewing(getRenewingActionName(), + action); + } + + protected String getRenewingActionName() { + if (tokenName == null) { + tokenName = "HDFS renewing token " + UUID.randomUUID(); + } + return tokenName; + } + + class RenewAction extends AsyncAction { + Configuration configuration; + Token<?> token; + private final FileSystem fs; + + RenewAction(String name, + Configuration configuration) + throws IOException, InterruptedException { + super(name); + this.configuration = configuration; + fs = getFileSystem(); + // get initial token by creating a kerberos authenticated user and + // invoking token methods as that user + synchronized (fs) { + this.token = remoteUser.doAs(new PrivilegedExceptionAction<Token<?>>() { + @Override + public Token<?> run() throws Exception { + log.info("Obtaining HDFS delgation token with user {}", + remoteUser.getShortUserName()); + return fs.getDelegationToken( + remoteUser.getShortUserName()); + } + }); + } + log.info("Initial request returned delegation token {}", token); + } + + protected FileSystem getFileSystem() + throws IOException, InterruptedException { + // return non-cache FS reference + return remoteUser.doAs(new PrivilegedExceptionAction<FileSystem>() { + @Override + public FileSystem run() throws Exception { + Configuration config = new Configuration(configuration); + config.setBoolean("fs.hdfs.impl.disable.cache", true); + return getRemoteFileSystemForRenewal(config); + } + }); + } + + @Override + public void execute(SliderAppMaster appMaster, QueueAccess queueService, + AppState appState) + throws Exception { + if (fs != null) { + synchronized(fs) { + try { + long expires = remoteUser.doAs(new PrivilegedExceptionAction<Long>() { + @Override + public Long run() throws Exception { + long expires = token.renew(fs.getConf()); + log.info("HDFS delegation token renewed"); + return expires; + } + }); + long calculatedInterval = expires - Time.now(); + if ( calculatedInterval < renewInterval ) { + // time to get a new token since the token will expire before + // next renewal interval. Could modify this to be closer to expiry + // time if deemed necessary.... + log.info("Interval of {} less than renew interval. Getting new token", + calculatedInterval); + getNewToken(); + } else { + updateRenewalTime(renewInterval); + } + } catch (IOException ie) { + // token has expired. get a new one... + log.info("Exception raised by renew", ie); + getNewToken(); + } + } + } + } + + private void getNewToken() + throws InterruptedException, IOException { + try { + Text service = token.getService(); + Token<?>[] tokens = remoteUser.doAs(new PrivilegedExceptionAction<Token<?>[]>() { + @Override + public Token<?>[] run() throws Exception { + return fs.addDelegationTokens(remoteUser.getShortUserName(), null); + } + }); + if (tokens.length == 0) { + throw new IOException("addDelegationTokens returned no tokens"); + } + token = findMatchingToken(service, tokens); + currentUser.addToken(token.getService(), token); + + log.info("Expired HDFS delegation token replaced and added as credential to current user"); + updateRenewalTime(renewInterval); + } catch (IOException ie2) { + throw new IOException("Can't get new delegation token ", ie2); + } + } + + private void updateRenewalTime(long interval) { + long delay = interval - interval/10; + renewingAction.updateInterval(delay, TimeUnit.MILLISECONDS); + log.info("Token renewal set for {} ms from now", delay); + } + + private Token<?> findMatchingToken(Text service, Token<?>[] tokens) { + Token<?> token = null; + int i = 0; + while (token == null && i < tokens.length) { + if (tokens[i].getService().equals(service)) { + token = tokens[i]; + } + i++; + } + + return token; + } + + Token<?> getToken() { + synchronized (fs) { + return token; + } + } + } + + protected FileSystem getRemoteFileSystemForRenewal(Configuration config) + throws IOException { + return FileSystem.get(config); + } +} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/0bd6c6cf/slider-core/src/test/groovy/org/apache/slider/server/services/security/TestFsDelegationTokenManager.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/services/security/TestFsDelegationTokenManager.groovy b/slider-core/src/test/groovy/org/apache/slider/server/services/security/TestFsDelegationTokenManager.groovy new file mode 100644 index 0000000..245c09a --- /dev/null +++ b/slider-core/src/test/groovy/org/apache/slider/server/services/security/TestFsDelegationTokenManager.groovy @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.services.security + +import groovy.util.logging.Slf4j +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem as HadoopFS +import org.apache.hadoop.fs.RawLocalFileSystem +import org.apache.hadoop.hdfs.DFSConfigKeys +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.security.token.SecretManager +import org.apache.hadoop.security.token.Token +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager +import org.apache.hadoop.service.ServiceOperations +import org.apache.hadoop.util.Time +import org.apache.slider.common.tools.CoreFileSystem +import org.apache.slider.server.appmaster.actions.ActionStopQueue +import org.apache.slider.server.appmaster.actions.QueueExecutor +import org.apache.slider.server.appmaster.actions.QueueService +import org.junit.After +import org.junit.Before +import org.junit.Test + +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicLong + +@Slf4j +//@CompileStatic +class TestFsDelegationTokenManager { + + QueueService queues; + FsDelegationTokenManager tokenManager; + Configuration conf; + UserGroupInformation currentUser; + + + @Before + void setup() { + queues = new QueueService(); + + conf = new Configuration() + conf.set( + DFSConfigKeys.HADOOP_SECURITY_AUTHENTICATION, + "TOKEN") + conf.setLong( + DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, + 1000) + queues.init(conf) + queues.start(); + + HadoopFS fs = new TestFileSystem() + + CoreFileSystem coreFileSystem = new CoreFileSystem(fs, conf) + + String[] groups = new String[1]; + groups[0] = 'testGroup1' + + currentUser = UserGroupInformation.createUserForTesting("test", groups) + UserGroupInformation.setLoginUser(currentUser) + + tokenManager = new FsDelegationTokenManager(queues) { + @Override + protected int getRenewingLimit() { + return 5 + } + + @Override + protected org.apache.hadoop.fs.FileSystem getRemoteFileSystemForRenewal(Configuration config) throws IOException { + return new TestFileSystem(); + } + + @Override + protected String getRenewingActionName() { + return "TEST RENEW" + } + } + + } + + public static class DummySecretManager extends + AbstractDelegationTokenSecretManager<DelegationTokenIdentifier> { + + public DummySecretManager(long delegationKeyUpdateInterval, + long delegationTokenMaxLifetime, long delegationTokenRenewInterval, + long delegationTokenRemoverScanInterval) { + super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, + delegationTokenRenewInterval, delegationTokenRemoverScanInterval); + } + + @Override + public DelegationTokenIdentifier createIdentifier() { + return null; + } + + @Override + public byte[] createPassword(DelegationTokenIdentifier dtId) { + return new byte[1]; + } + } + + public class TestFileSystem extends RawLocalFileSystem { + int sequenceNum = 0; + SecretManager<DelegationTokenIdentifier> mgr = + new DummySecretManager(0, 0, 0, 0); + @Override + Token<DelegationTokenIdentifier> getDelegationToken(String renewer) throws IOException { + return new TestToken(getIdentifier(), mgr); + } + + @Override + Token<?>[] addDelegationTokens(String renewer, Credentials credentials) throws IOException { + Token[] tokens = new Token[1] + tokens[0] = new TestToken(getIdentifier(), mgr) + return tokens + } + + private DelegationTokenIdentifier getIdentifier() { + def user = new Text(currentUser.getUserName()) + def id = new DelegationTokenIdentifier(user, user, user) + id.setSequenceNumber(sequenceNum++) + + return id + } + } + + public class TestToken extends Token<DelegationTokenIdentifier> { + static long maxCount = 0; + private final AtomicLong renewCount = new AtomicLong() + private final AtomicLong totalCount = new AtomicLong() + public final AtomicBoolean expired = new AtomicBoolean(false); + public final AtomicBoolean cancelled = new AtomicBoolean(false); + + TestToken(DelegationTokenIdentifier id, SecretManager<DelegationTokenIdentifier> mgr) { + super(id, mgr) + } + + @Override + Text getService() { + return new Text("HDFS") + } + + @Override + long renew(Configuration conf) throws IOException, InterruptedException { + totalCount.getAndIncrement(); + if (maxCount > 0 && renewCount.getAndIncrement() > maxCount) { + renewCount.set(0L) + expired.set(true) + throw new IOException("Expired") + } + + + return Time.now() + 10000; + } + + @Override + void cancel(Configuration conf) throws IOException, InterruptedException { + cancelled.set(true) + } + } + + @After + void destroyService() { + ServiceOperations.stop(queues); + } + + public void runQueuesToCompletion() { + new Thread(queues).start(); + QueueExecutor ex = new QueueExecutor(queues) + ex.run(); + } + + public void runQueuesButNotToCompletion() { + new Thread(queues).start(); + QueueExecutor ex = new QueueExecutor(queues) + new Thread(ex).start(); + Thread.sleep(1000) + tokenManager.cancelDelegationToken(conf) + } + + @Test + public void testRenew() throws Throwable { + tokenManager.acquireDelegationToken(conf) + def stop = new ActionStopQueue(10, TimeUnit.SECONDS) + queues.schedule(stop); + runQueuesToCompletion() + + TestToken token = (TestToken) currentUser.getTokens()[0] + assert token.totalCount.get() > 4 + } + + @Test + public void testCancel() throws Throwable { + tokenManager.acquireDelegationToken(conf) + def stop = new ActionStopQueue(10, TimeUnit.SECONDS) + queues.schedule(stop); + runQueuesButNotToCompletion() + + TestToken token = (TestToken) currentUser.getTokens()[0] + assert token.cancelled.get() + assert queues.lookupRenewingAction("TEST RENEW") == null + } + + + @Test + public void testRenewPastExpiry() throws Throwable { + try { + TestToken.maxCount = 3L + tokenManager.acquireDelegationToken(conf) + TestToken origToken = currentUser.getTokens()[0] + def stop = new ActionStopQueue(10, TimeUnit.SECONDS) + queues.schedule(stop); + runQueuesToCompletion() + + TestToken token = (TestToken) currentUser.getTokens()[0] + assert token != null + assert token != origToken + assert origToken.getService().equals(token.getService()) + assert origToken.totalCount.get() > 4 + assert origToken.expired.get() + } finally { + TestToken.maxCount = 0 + } + } +}