Repository: hadoop Updated Branches: refs/heads/branch-2 6eb88c278 -> d35eba7b1 refs/heads/trunk bf8e4332c -> bd8196e85
HADOOP-11157. ZKDelegationTokenSecretManager never shuts down listenerThreadPool. Contributed by Arun Suresh. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bd8196e8 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bd8196e8 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bd8196e8 Branch: refs/heads/trunk Commit: bd8196e85e49d44de57237a59bcd7ceae4332c2e Parents: bf8e433 Author: Aaron T. Myers <a...@apache.org> Authored: Mon Nov 17 12:57:52 2014 -0800 Committer: Aaron T. Myers <a...@apache.org> Committed: Mon Nov 17 13:02:49 2014 -0800 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 3 + .../ZKDelegationTokenSecretManager.java | 126 +++++++-- .../TestZKDelegationTokenSecretManager.java | 275 +++++++++++++++---- 3 files changed, 331 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd8196e8/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index bce342b..bc63c75 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -441,6 +441,9 @@ Release 2.7.0 - UNRELEASED HADOOP-11294. Nfs3FileAttributes should not change the values of rdev, nlink and size in the constructor. (Brandon Li via wheat9) + HADOOP-11157. ZKDelegationTokenSecretManager never shuts down + listenerThreadPool. (Arun Suresh via atm) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd8196e8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java index ebc45a5..d6bc995 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import javax.security.auth.login.AppConfigurationEntry; @@ -38,6 +39,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.CuratorFrameworkFactory.Builder; import org.apache.curator.framework.api.ACLProvider; import org.apache.curator.framework.imps.DefaultACLProvider; +import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; @@ -48,6 +50,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -80,6 +83,8 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract + "zkSessionTimeout"; public static final String ZK_DTSM_ZK_CONNECTION_TIMEOUT = ZK_CONF_PREFIX + "zkConnectionTimeout"; + public static final String ZK_DTSM_ZK_SHUTDOWN_TIMEOUT = ZK_CONF_PREFIX + + "zkShutdownTimeout"; public static final String ZK_DTSM_ZNODE_WORKING_PATH = ZK_CONF_PREFIX + "znodeWorkingPath"; public static final String ZK_DTSM_ZK_AUTH_TYPE = ZK_CONF_PREFIX @@ -94,6 +99,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract public static final int ZK_DTSM_ZK_NUM_RETRIES_DEFAULT = 3; public static final int ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT = 10000; public static final int ZK_DTSM_ZK_CONNECTION_TIMEOUT_DEFAULT = 10000; + public static final int ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT = 10000; public static final String ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT = "zkdtsm"; private static Logger LOG = LoggerFactory @@ -125,6 +131,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract private PathChildrenCache keyCache; private PathChildrenCache tokenCache; private ExecutorService listenerThreadPool; + private final long shutdownTimeout; public ZKDelegationTokenSecretManager(Configuration conf) { super(conf.getLong(DelegationTokenManager.UPDATE_INTERVAL, @@ -135,6 +142,8 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract DelegationTokenManager.RENEW_INTERVAL_DEFAULT * 1000), conf.getLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, DelegationTokenManager.REMOVAL_SCAN_INTERVAL_DEFAULT) * 1000); + shutdownTimeout = conf.getLong(ZK_DTSM_ZK_SHUTDOWN_TIMEOUT, + ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT); if (CURATOR_TL.get() != null) { zkClient = CURATOR_TL.get().usingNamespace( @@ -199,7 +208,6 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract .build(); isExternalClient = false; } - listenerThreadPool = Executors.newFixedThreadPool(2); } private String setJaasConfiguration(Configuration config) throws Exception { @@ -290,6 +298,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract throw new IOException("Could not start Curator Framework", e); } } + listenerThreadPool = Executors.newSingleThreadExecutor(); try { delTokSeqCounter = new SharedCount(zkClient, ZK_DTSM_SEQNUM_ROOT, 0); if (delTokSeqCounter != null) { @@ -315,7 +324,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract try { keyCache = new PathChildrenCache(zkClient, ZK_DTSM_MASTER_KEY_ROOT, true); if (keyCache != null) { - keyCache.start(StartMode.POST_INITIALIZED_EVENT); + keyCache.start(StartMode.BUILD_INITIAL_CACHE); keyCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, @@ -343,7 +352,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract try { tokenCache = new PathChildrenCache(zkClient, ZK_DTSM_TOKENS_ROOT, true); if (tokenCache != null) { - tokenCache.start(StartMode.POST_INITIALIZED_EVENT); + tokenCache.start(StartMode.BUILD_INITIAL_CACHE); tokenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override @@ -351,13 +360,13 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: - processTokenAddOrUpdate(event.getData().getData()); + processTokenAddOrUpdate(event.getData()); break; case CHILD_UPDATED: - processTokenAddOrUpdate(event.getData().getData()); + processTokenAddOrUpdate(event.getData()); break; case CHILD_REMOVED: - processTokenRemoved(event.getData().getData()); + processTokenRemoved(event.getData()); break; default: break; @@ -376,7 +385,9 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract DataInputStream din = new DataInputStream(bin); DelegationKey key = new DelegationKey(); key.readFields(din); - allKeys.put(key.getKeyId(), key); + synchronized (this) { + allKeys.put(key.getKeyId(), key); + } } private void processKeyRemoved(String path) { @@ -386,13 +397,15 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract int j = tokSeg.indexOf('_'); if (j > 0) { int keyId = Integer.parseInt(tokSeg.substring(j + 1)); - allKeys.remove(keyId); + synchronized (this) { + allKeys.remove(keyId); + } } } } - private void processTokenAddOrUpdate(byte[] data) throws IOException { - ByteArrayInputStream bin = new ByteArrayInputStream(data); + private void processTokenAddOrUpdate(ChildData data) throws IOException { + ByteArrayInputStream bin = new ByteArrayInputStream(data.getData()); DataInputStream din = new DataInputStream(bin); TokenIdent ident = createIdentifier(); ident.readFields(din); @@ -403,41 +416,78 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract if (numRead > -1) { DelegationTokenInformation tokenInfo = new DelegationTokenInformation(renewDate, password); - currentTokens.put(ident, tokenInfo); + synchronized (this) { + currentTokens.put(ident, tokenInfo); + // The cancel task might be waiting + notifyAll(); + } } } - private void processTokenRemoved(byte[] data) throws IOException { - ByteArrayInputStream bin = new ByteArrayInputStream(data); + private void processTokenRemoved(ChildData data) throws IOException { + ByteArrayInputStream bin = new ByteArrayInputStream(data.getData()); DataInputStream din = new DataInputStream(bin); TokenIdent ident = createIdentifier(); ident.readFields(din); - currentTokens.remove(ident); + synchronized (this) { + currentTokens.remove(ident); + // The cancel task might be waiting + notifyAll(); + } } @Override public void stopThreads() { + super.stopThreads(); try { - if (!isExternalClient && (zkClient != null)) { - zkClient.close(); + if (tokenCache != null) { + tokenCache.close(); } + } catch (Exception e) { + LOG.error("Could not stop Delegation Token Cache", e); + } + try { if (delTokSeqCounter != null) { delTokSeqCounter.close(); } + } catch (Exception e) { + LOG.error("Could not stop Delegation Token Counter", e); + } + try { if (keyIdSeqCounter != null) { keyIdSeqCounter.close(); } + } catch (Exception e) { + LOG.error("Could not stop Key Id Counter", e); + } + try { if (keyCache != null) { keyCache.close(); } - if (tokenCache != null) { - tokenCache.close(); + } catch (Exception e) { + LOG.error("Could not stop KeyCache", e); + } + try { + if (!isExternalClient && (zkClient != null)) { + zkClient.close(); } } catch (Exception e) { LOG.error("Could not stop Curator Framework", e); - // Ignore } - super.stopThreads(); + if (listenerThreadPool != null) { + listenerThreadPool.shutdown(); + try { + // wait for existing tasks to terminate + if (!listenerThreadPool.awaitTermination(shutdownTimeout, + TimeUnit.MILLISECONDS)) { + LOG.error("Forcing Listener threadPool to shutdown !!"); + listenerThreadPool.shutdownNow(); + } + } catch (InterruptedException ie) { + listenerThreadPool.shutdownNow(); + Thread.currentThread().interrupt(); + } + } } private void createPersistentNode(String nodePath) throws Exception { @@ -460,6 +510,10 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract try { while (!delTokSeqCounter.trySetCount(delTokSeqCounter.getCount() + 1)) { } + } catch (InterruptedException e) { + // The ExpirationThread is just finishing.. so dont do anything.. + LOG.debug("Thread interrupted while performing token counter increment", e); + Thread.currentThread().interrupt(); } catch (Exception e) { throw new RuntimeException("Could not increment shared counter !!", e); } @@ -485,6 +539,10 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract try { while (!keyIdSeqCounter.trySetCount(keyIdSeqCounter.getCount() + 1)) { } + } catch (InterruptedException e) { + // The ExpirationThread is just finishing.. so dont do anything.. + LOG.debug("Thread interrupted while performing keyId increment", e); + Thread.currentThread().interrupt(); } catch (Exception e) { throw new RuntimeException("Could not increment shared keyId counter !!", e); } @@ -588,13 +646,11 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract @Override protected void storeDelegationKey(DelegationKey key) throws IOException { - allKeys.put(key.getKeyId(), key); addOrUpdateDelegationKey(key, false); } @Override protected void updateDelegationKey(DelegationKey key) throws IOException { - allKeys.put(key.getKeyId(), key); addOrUpdateDelegationKey(key, true); } @@ -658,7 +714,6 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract @Override protected void storeToken(TokenIdent ident, DelegationTokenInformation tokenInfo) throws IOException { - currentTokens.put(ident, tokenInfo); try { addOrUpdateToken(ident, tokenInfo, false); } catch (Exception e) { @@ -669,7 +724,6 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract @Override protected void updateToken(TokenIdent ident, DelegationTokenInformation tokenInfo) throws IOException { - currentTokens.put(ident, tokenInfo); String nodeRemovePath = getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX + ident.getSequenceNumber()); @@ -711,6 +765,25 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract } } + @Override + public synchronized TokenIdent cancelToken(Token<TokenIdent> token, + String canceller) throws IOException { + ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); + DataInputStream in = new DataInputStream(buf); + TokenIdent id = createIdentifier(); + id.readFields(in); + try { + if (!currentTokens.containsKey(id)) { + // See if token can be retrieved and placed in currentTokens + getTokenInfo(id); + } + return super.cancelToken(token, canceller); + } catch (Exception e) { + LOG.error("Exception while checking if token exist !!", e); + return id; + } + } + private void addOrUpdateToken(TokenIdent ident, DelegationTokenInformation info, boolean isUpdate) throws Exception { String nodeCreatePath = @@ -772,4 +845,9 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract static String getNodePath(String root, String nodeName) { return (root + "/" + nodeName); } + + @VisibleForTesting + public ExecutorService getListenerThreadPool() { + return listenerThreadPool; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd8196e8/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java index b3049c4..6435c0b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java @@ -18,6 +18,10 @@ package org.apache.hadoop.security.token.delegation; +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; + import org.apache.curator.test.TestingServer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; @@ -36,6 +40,12 @@ import org.junit.Test; public class TestZKDelegationTokenSecretManager { + private static final int TEST_RETRIES = 2; + + private static final int RETRY_COUNT = 5; + + private static final int RETRY_WAIT = 1000; + private static final long DAY_IN_SECS = 86400; private TestingServer zkServer; @@ -59,6 +69,7 @@ public class TestZKDelegationTokenSecretManager { conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_CONNECTION_STRING, connectString); conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "testPath"); conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_AUTH_TYPE, "none"); + conf.setLong(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT, 100); conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, DAY_IN_SECS); conf.setLong(DelegationTokenManager.MAX_LIFETIME, DAY_IN_SECS); conf.setLong(DelegationTokenManager.RENEW_INTERVAL, DAY_IN_SECS); @@ -69,80 +80,246 @@ public class TestZKDelegationTokenSecretManager { @SuppressWarnings("unchecked") @Test public void testMultiNodeOperations() throws Exception { - DelegationTokenManager tm1, tm2 = null; - String connectString = zkServer.getConnectString(); - Configuration conf = getSecretConf(connectString); - tm1 = new DelegationTokenManager(conf, new Text("bla")); - tm1.init(); - tm2 = new DelegationTokenManager(conf, new Text("bla")); - tm2.init(); + for (int i = 0; i < TEST_RETRIES; i++) { + DelegationTokenManager tm1, tm2 = null; + String connectString = zkServer.getConnectString(); + Configuration conf = getSecretConf(connectString); + tm1 = new DelegationTokenManager(conf, new Text("bla")); + tm1.init(); + tm2 = new DelegationTokenManager(conf, new Text("bla")); + tm2.init(); - Token<DelegationTokenIdentifier> token = - (Token<DelegationTokenIdentifier>) tm1.createToken( - UserGroupInformation.getCurrentUser(), "foo"); - Assert.assertNotNull(token); - tm2.verifyToken(token); - tm2.renewToken(token, "foo"); - tm1.verifyToken(token); - tm1.cancelToken(token, "foo"); - try { + Token<DelegationTokenIdentifier> token = + (Token<DelegationTokenIdentifier>) tm1.createToken( + UserGroupInformation.getCurrentUser(), "foo"); + Assert.assertNotNull(token); tm2.verifyToken(token); - fail("Expected InvalidToken"); - } catch (SecretManager.InvalidToken it) { - // Ignore - } + tm2.renewToken(token, "foo"); + tm1.verifyToken(token); + tm1.cancelToken(token, "foo"); + try { + verifyTokenFail(tm2, token); + fail("Expected InvalidToken"); + } catch (SecretManager.InvalidToken it) { + // Ignore + } - token = (Token<DelegationTokenIdentifier>) tm2.createToken( - UserGroupInformation.getCurrentUser(), "bar"); - Assert.assertNotNull(token); - tm1.verifyToken(token); - tm1.renewToken(token, "bar"); - tm2.verifyToken(token); - tm2.cancelToken(token, "bar"); - try { + token = (Token<DelegationTokenIdentifier>) tm2.createToken( + UserGroupInformation.getCurrentUser(), "bar"); + Assert.assertNotNull(token); tm1.verifyToken(token); - fail("Expected InvalidToken"); - } catch (SecretManager.InvalidToken it) { - // Ignore + tm1.renewToken(token, "bar"); + tm2.verifyToken(token); + tm2.cancelToken(token, "bar"); + try { + verifyTokenFail(tm1, token); + fail("Expected InvalidToken"); + } catch (SecretManager.InvalidToken it) { + // Ignore + } + verifyDestroy(tm1, conf); + verifyDestroy(tm2, conf); + } + } + + @SuppressWarnings("unchecked") + @Test + public void testNodeUpAferAWhile() throws Exception { + for (int i = 0; i < TEST_RETRIES; i++) { + String connectString = zkServer.getConnectString(); + Configuration conf = getSecretConf(connectString); + DelegationTokenManager tm1 = new DelegationTokenManager(conf, new Text("bla")); + tm1.init(); + Token<DelegationTokenIdentifier> token1 = + (Token<DelegationTokenIdentifier>) tm1.createToken( + UserGroupInformation.getCurrentUser(), "foo"); + Assert.assertNotNull(token1); + Token<DelegationTokenIdentifier> token2 = + (Token<DelegationTokenIdentifier>) tm1.createToken( + UserGroupInformation.getCurrentUser(), "bar"); + Assert.assertNotNull(token2); + Token<DelegationTokenIdentifier> token3 = + (Token<DelegationTokenIdentifier>) tm1.createToken( + UserGroupInformation.getCurrentUser(), "boo"); + Assert.assertNotNull(token3); + + tm1.verifyToken(token1); + tm1.verifyToken(token2); + tm1.verifyToken(token3); + + // Cancel one token + tm1.cancelToken(token1, "foo"); + + // Start second node after some time.. + Thread.sleep(1000); + DelegationTokenManager tm2 = new DelegationTokenManager(conf, new Text("bla")); + tm2.init(); + + tm2.verifyToken(token2); + tm2.verifyToken(token3); + try { + verifyTokenFail(tm2, token1); + fail("Expected InvalidToken"); + } catch (SecretManager.InvalidToken it) { + // Ignore + } + + // Create a new token thru the new ZKDTSM + Token<DelegationTokenIdentifier> token4 = + (Token<DelegationTokenIdentifier>) tm2.createToken( + UserGroupInformation.getCurrentUser(), "xyz"); + Assert.assertNotNull(token4); + tm2.verifyToken(token4); + tm1.verifyToken(token4); + + // Bring down tm2 + verifyDestroy(tm2, conf); + + // Start third node after some time.. + Thread.sleep(1000); + DelegationTokenManager tm3 = new DelegationTokenManager(conf, new Text("bla")); + tm3.init(); + + tm3.verifyToken(token2); + tm3.verifyToken(token3); + tm3.verifyToken(token4); + try { + verifyTokenFail(tm3, token1); + fail("Expected InvalidToken"); + } catch (SecretManager.InvalidToken it) { + // Ignore + } + + verifyDestroy(tm3, conf); + verifyDestroy(tm1, conf); } } @SuppressWarnings("unchecked") @Test public void testRenewTokenSingleManager() throws Exception { - DelegationTokenManager tm1 = null; - String connectString = zkServer.getConnectString(); - Configuration conf = getSecretConf(connectString); - tm1 = new DelegationTokenManager(conf, new Text("foo")); - tm1.init(); + for (int i = 0; i < TEST_RETRIES; i++) { + DelegationTokenManager tm1 = null; + String connectString = zkServer.getConnectString(); + Configuration conf = getSecretConf(connectString); + tm1 = new DelegationTokenManager(conf, new Text("foo")); + tm1.init(); - Token<DelegationTokenIdentifier> token = - (Token<DelegationTokenIdentifier>) - tm1.createToken(UserGroupInformation.getCurrentUser(), "foo"); - Assert.assertNotNull(token); - tm1.renewToken(token, "foo"); - tm1.verifyToken(token); + Token<DelegationTokenIdentifier> token = + (Token<DelegationTokenIdentifier>) + tm1.createToken(UserGroupInformation.getCurrentUser(), "foo"); + Assert.assertNotNull(token); + tm1.renewToken(token, "foo"); + tm1.verifyToken(token); + verifyDestroy(tm1, conf); + } } @SuppressWarnings("unchecked") @Test public void testCancelTokenSingleManager() throws Exception { + for (int i = 0; i < TEST_RETRIES; i++) { + DelegationTokenManager tm1 = null; + String connectString = zkServer.getConnectString(); + Configuration conf = getSecretConf(connectString); + tm1 = new DelegationTokenManager(conf, new Text("foo")); + tm1.init(); + + Token<DelegationTokenIdentifier> token = + (Token<DelegationTokenIdentifier>) + tm1.createToken(UserGroupInformation.getCurrentUser(), "foo"); + Assert.assertNotNull(token); + tm1.cancelToken(token, "foo"); + try { + verifyTokenFail(tm1, token); + fail("Expected InvalidToken"); + } catch (SecretManager.InvalidToken it) { + it.printStackTrace(); + } + verifyDestroy(tm1, conf); + } + } + + @SuppressWarnings("rawtypes") + protected void verifyDestroy(DelegationTokenManager tm, Configuration conf) + throws Exception { + AbstractDelegationTokenSecretManager sm = + tm.getDelegationTokenSecretManager(); + ZKDelegationTokenSecretManager zksm = (ZKDelegationTokenSecretManager) sm; + ExecutorService es = zksm.getListenerThreadPool(); + tm.destroy(); + Assert.assertTrue(es.isShutdown()); + // wait for the pool to terminate + long timeout = + conf.getLong( + ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT, + ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT); + Thread.sleep(timeout * 3); + Assert.assertTrue(es.isTerminated()); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testStopThreads() throws Exception { DelegationTokenManager tm1 = null; String connectString = zkServer.getConnectString(); + + // let's make the update interval short and the shutdown interval + // comparatively longer, so if the update thread runs after shutdown, + // it will cause an error. + final long updateIntervalSeconds = 1; + final long shutdownTimeoutMillis = updateIntervalSeconds * 1000 * 5; Configuration conf = getSecretConf(connectString); + conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, updateIntervalSeconds); + conf.setLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, updateIntervalSeconds); + conf.setLong(DelegationTokenManager.RENEW_INTERVAL, updateIntervalSeconds); + + conf.setLong(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT, shutdownTimeoutMillis); tm1 = new DelegationTokenManager(conf, new Text("foo")); tm1.init(); Token<DelegationTokenIdentifier> token = - (Token<DelegationTokenIdentifier>) - tm1.createToken(UserGroupInformation.getCurrentUser(), "foo"); + (Token<DelegationTokenIdentifier>) + tm1.createToken(UserGroupInformation.getCurrentUser(), "foo"); Assert.assertNotNull(token); - tm1.cancelToken(token, "foo"); + + AbstractDelegationTokenSecretManager sm = tm1.getDelegationTokenSecretManager(); + ZKDelegationTokenSecretManager zksm = (ZKDelegationTokenSecretManager)sm; + ExecutorService es = zksm.getListenerThreadPool(); + es.submit(new Callable<Void>() { + public Void call() throws Exception { + Thread.sleep(shutdownTimeoutMillis * 2); // force this to be shutdownNow + return null; + } + }); + + tm1.destroy(); + } + + // Since it is possible that there can be a delay for the cancel token message + // initiated by one node to reach another node.. The second node can ofcourse + // verify with ZK directly if the token that needs verification has been + // cancelled but.. that would mean having to make an RPC call for every + // verification request. + // Thus, the eventual consistency tradef-off should be acceptable here... + private void verifyTokenFail(DelegationTokenManager tm, + Token<DelegationTokenIdentifier> token) throws IOException, + InterruptedException { + verifyTokenFailWithRetry(tm, token, RETRY_COUNT); + } + + private void verifyTokenFailWithRetry(DelegationTokenManager tm, + Token<DelegationTokenIdentifier> token, int retryCount) + throws IOException, InterruptedException { try { - tm1.verifyToken(token); - fail("Expected InvalidToken"); - } catch (SecretManager.InvalidToken it) { - it.printStackTrace(); + tm.verifyToken(token); + } catch (SecretManager.InvalidToken er) { + throw er; + } + if (retryCount > 0) { + Thread.sleep(RETRY_WAIT); + verifyTokenFailWithRetry(tm, token, retryCount - 1); } } + }