This is an automated email from the ASF dual-hosted git repository. slfan1989 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 503e803b285 HDFS-17128. Updating SQLDelegationTokenSecretManager to use LoadingCa… (#5963) 503e803b285 is described below commit 503e803b285d4d494eba3a4b0d12e235e5de08e5 Author: hchaverri <55413673+hchave...@users.noreply.github.com> AuthorDate: Sat Aug 19 19:12:45 2023 -0700 HDFS-17128. Updating SQLDelegationTokenSecretManager to use LoadingCa… (#5963) --- .../AbstractDelegationTokenSecretManager.java | 12 ++- .../delegation/DelegationTokenLoadingCache.java | 119 ++++++++++++++++++++ .../SQLDelegationTokenSecretManager.java | 96 ++++++++++++----- .../TestSQLDelegationTokenSecretManagerImpl.java | 120 ++++++++++++++++++++- 4 files changed, 312 insertions(+), 35 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java index 61c3312c107..bc0ef624d5a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java @@ -82,9 +82,8 @@ public abstract class AbstractDelegationTokenSecretManager<TokenIdent * Cache of currently valid tokens, mapping from DelegationTokenIdentifier * to DelegationTokenInformation. Protected by this object lock. */ - protected final Map<TokenIdent, DelegationTokenInformation> currentTokens - = new ConcurrentHashMap<>(); - + protected Map<TokenIdent, DelegationTokenInformation> currentTokens; + /** * Sequence number to create DelegationTokenIdentifier. * Protected by this object lock. @@ -143,6 +142,7 @@ public abstract class AbstractDelegationTokenSecretManager<TokenIdent this.tokenRenewInterval = delegationTokenRenewInterval; this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval; this.storeTokenTrackingId = false; + this.currentTokens = new ConcurrentHashMap<>(); } /** @@ -757,10 +757,14 @@ public abstract class AbstractDelegationTokenSecretManager<TokenIdent for (TokenIdent ident : expiredTokens) { logExpireToken(ident); LOG.info("Removing expired token " + formatTokenId(ident)); - removeStoredToken(ident); + removeExpiredStoredToken(ident); } } + protected void removeExpiredStoredToken(TokenIdent ident) throws IOException { + removeStoredToken(ident); + } + public void stopThreads() { if (LOG.isDebugEnabled()) { LOG.debug("Stopping expired delegation token remover thread"); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationTokenLoadingCache.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationTokenLoadingCache.java new file mode 100644 index 00000000000..2251f98c143 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationTokenLoadingCache.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.security.token.delegation; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder; +import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader; +import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache; + + +/** + * Cache for delegation tokens that can handle high volume of tokens. A + * loading cache will prevent all active tokens from being in memory at the + * same time. It will also trigger more requests from the persistent token storage. + */ +public class DelegationTokenLoadingCache<K, V> implements Map<K, V> { + private LoadingCache<K, V> internalLoadingCache; + + public DelegationTokenLoadingCache(long cacheExpirationMs, long maximumCacheSize, + Function<K, V> singleEntryFunction) { + this.internalLoadingCache = CacheBuilder.newBuilder() + .expireAfterWrite(cacheExpirationMs, TimeUnit.MILLISECONDS) + .maximumSize(maximumCacheSize) + .build(new CacheLoader<K, V>() { + @Override + public V load(K k) throws Exception { + return singleEntryFunction.apply(k); + } + }); + } + + @Override + public int size() { + return (int) this.internalLoadingCache.size(); + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + @Override + public boolean containsKey(Object key) { + return this.internalLoadingCache.getIfPresent(key) != null; + } + + @Override + public boolean containsValue(Object value) { + throw new UnsupportedOperationException(); + } + + @SuppressWarnings("unchecked") + @Override + public V get(Object key) { + try { + return this.internalLoadingCache.get((K) key); + } catch (Exception e) { + return null; + } + } + + @Override + public V put(K key, V value) { + this.internalLoadingCache.put(key, value); + return this.internalLoadingCache.getIfPresent(key); + } + + @Override + public V remove(Object key) { + V value = this.internalLoadingCache.getIfPresent(key); + this.internalLoadingCache.invalidate(key); + return value; + } + + @Override + public void putAll(Map<? extends K, ? extends V> m) { + this.internalLoadingCache.putAll(m); + } + + @Override + public void clear() { + this.internalLoadingCache.invalidateAll(); + } + + @Override + public Set<K> keySet() { + return this.internalLoadingCache.asMap().keySet(); + } + + @Override + public Collection<V> values() { + return this.internalLoadingCache.asMap().values(); + } + + @Override + public Set<Entry<K, V>> entrySet() { + return this.internalLoadingCache.asMap().entrySet(); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java index 4b6ae21d7a9..75f00d3f924 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java @@ -24,9 +24,13 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.sql.SQLException; +import java.util.NoSuchElementException; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager; +import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +50,13 @@ public abstract class SQLDelegationTokenSecretManager<TokenIdent private static final String SQL_DTSM_TOKEN_SEQNUM_BATCH_SIZE = SQL_DTSM_CONF_PREFIX + "token.seqnum.batch.size"; public static final int DEFAULT_SEQ_NUM_BATCH_SIZE = 10; + public static final String SQL_DTSM_TOKEN_LOADING_CACHE_EXPIRATION = SQL_DTSM_CONF_PREFIX + + "token.loading.cache.expiration"; + public static final long SQL_DTSM_TOKEN_LOADING_CACHE_EXPIRATION_DEFAULT = + TimeUnit.SECONDS.toMillis(10); + public static final String SQL_DTSM_TOKEN_LOADING_CACHE_MAX_SIZE = SQL_DTSM_CONF_PREFIX + + "token.loading.cache.max.size"; + public static final long SQL_DTSM_TOKEN_LOADING_CACHE_MAX_SIZE_DEFAULT = 100000; // Batch of sequence numbers that will be requested by the sequenceNumCounter. // A new batch is requested once the sequenceNums available to a secret manager are @@ -71,6 +82,13 @@ public abstract class SQLDelegationTokenSecretManager<TokenIdent this.seqNumBatchSize = conf.getInt(SQL_DTSM_TOKEN_SEQNUM_BATCH_SIZE, DEFAULT_SEQ_NUM_BATCH_SIZE); + + long cacheExpirationMs = conf.getTimeDuration(SQL_DTSM_TOKEN_LOADING_CACHE_EXPIRATION, + SQL_DTSM_TOKEN_LOADING_CACHE_EXPIRATION_DEFAULT, TimeUnit.MILLISECONDS); + long maximumCacheSize = conf.getLong(SQL_DTSM_TOKEN_LOADING_CACHE_MAX_SIZE, + SQL_DTSM_TOKEN_LOADING_CACHE_MAX_SIZE_DEFAULT); + this.currentTokens = new DelegationTokenLoadingCache<>(cacheExpirationMs, maximumCacheSize, + this::getTokenInfoFromSQL); } /** @@ -126,15 +144,11 @@ public abstract class SQLDelegationTokenSecretManager<TokenIdent @Override public synchronized TokenIdent cancelToken(Token<TokenIdent> token, String canceller) throws IOException { - try (ByteArrayInputStream bis = new ByteArrayInputStream(token.getIdentifier()); - DataInputStream din = new DataInputStream(bis)) { - TokenIdent id = createIdentifier(); - id.readFields(din); + TokenIdent id = createTokenIdent(token.getIdentifier()); - // Calling getTokenInfo to load token into local cache if not present. - // super.cancelToken() requires token to be present in local cache. - getTokenInfo(id); - } + // Calling getTokenInfo to load token into local cache if not present. + // super.cancelToken() requires token to be present in local cache. + getTokenInfo(id); return super.cancelToken(token, canceller); } @@ -153,6 +167,24 @@ public abstract class SQLDelegationTokenSecretManager<TokenIdent } } + @Override + protected void removeExpiredStoredToken(TokenIdent ident) { + try { + // Ensure that the token has not been renewed in SQL by + // another secret manager + DelegationTokenInformation tokenInfo = getTokenInfoFromSQL(ident); + if (tokenInfo.getRenewDate() >= Time.now()) { + LOG.info("Token was renewed by a different router and has not been deleted: {}", ident); + return; + } + removeStoredToken(ident); + } catch (NoSuchElementException e) { + LOG.info("Token has already been deleted by a different router: {}", ident); + } catch (Exception e) { + LOG.warn("Could not remove token {}", ident, e); + } + } + /** * Obtains the DelegationTokenInformation associated with the given * TokenIdentifier in the SQL database. @@ -160,29 +192,35 @@ public abstract class SQLDelegationTokenSecretManager<TokenIdent * @return DelegationTokenInformation that matches the given TokenIdentifier or * null if it doesn't exist in the database. */ - @Override - protected DelegationTokenInformation getTokenInfo(TokenIdent ident) { - // Look for token in local cache - DelegationTokenInformation tokenInfo = super.getTokenInfo(ident); - - if (tokenInfo == null) { - try { - // Look for token in SQL database - byte[] tokenInfoBytes = selectTokenInfo(ident.getSequenceNumber(), ident.getBytes()); + @VisibleForTesting + protected DelegationTokenInformation getTokenInfoFromSQL(TokenIdent ident) { + try { + byte[] tokenInfoBytes = selectTokenInfo(ident.getSequenceNumber(), ident.getBytes()); + if (tokenInfoBytes == null) { + // Throw exception so value is not added to cache + throw new NoSuchElementException("Token not found in SQL secret manager: " + ident); + } + return createTokenInfo(tokenInfoBytes); + } catch (SQLException | IOException e) { + LOG.error("Failed to get token in SQL secret manager", e); + throw new RuntimeException(e); + } + } - if (tokenInfoBytes != null) { - tokenInfo = new DelegationTokenInformation(); - try (ByteArrayInputStream bis = new ByteArrayInputStream(tokenInfoBytes)) { - try (DataInputStream dis = new DataInputStream(bis)) { - tokenInfo.readFields(dis); - } - } + private TokenIdent createTokenIdent(byte[] tokenIdentBytes) throws IOException { + try (ByteArrayInputStream bis = new ByteArrayInputStream(tokenIdentBytes); + DataInputStream din = new DataInputStream(bis)) { + TokenIdent id = createIdentifier(); + id.readFields(din); + return id; + } + } - // Update token in local cache - currentTokens.put(ident, tokenInfo); - } - } catch (IOException | SQLException e) { - LOG.error("Failed to get token in SQL secret manager", e); + private DelegationTokenInformation createTokenInfo(byte[] tokenInfoBytes) throws IOException { + DelegationTokenInformation tokenInfo = new DelegationTokenInformation(); + try (ByteArrayInputStream bis = new ByteArrayInputStream(tokenInfoBytes)) { + try (DataInputStream dis = new DataInputStream(bis)) { + tokenInfo.readFields(dis); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java index 569a274042b..5165f7fd770 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java @@ -25,13 +25,21 @@ import java.sql.SQLException; import java.util.Arrays; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; +import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager; import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.hadoop.util.Time; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -43,6 +51,7 @@ import org.junit.Test; public class TestSQLDelegationTokenSecretManagerImpl { private static final String CONNECTION_URL = "jdbc:derby:memory:TokenStore"; private static final int TEST_MAX_RETRIES = 3; + private static final int TOKEN_EXPIRATION_SECONDS = 1; private static Configuration conf; @Before @@ -111,6 +120,98 @@ public class TestSQLDelegationTokenSecretManagerImpl { } } + @Test + public void testCancelToken() throws Exception { + DelegationTokenManager tokenManager1 = createTokenManager(getShortLivedTokenConf()); + DelegationTokenManager tokenManager2 = createTokenManager(getShortLivedTokenConf()); + + TestDelegationTokenSecretManager secretManager2 = + (TestDelegationTokenSecretManager) tokenManager2.getDelegationTokenSecretManager(); + + try { + // Create token on token manager 1 + Token<? extends AbstractDelegationTokenIdentifier> token1 = + tokenManager1.createToken(UserGroupInformation.getCurrentUser(), "foo"); + + // Load token on token manager 2 to test it doesn't get stale + tokenManager2.verifyToken(token1); + + // Cancel token on token manager 1 + tokenManager1.cancelToken(token1, "foo"); + + // Validate that token cancellation is propagated to token manager 2 + secretManager2.waitForTokenEviction(token1.decodeIdentifier()); + LambdaTestUtils.intercept(SecretManager.InvalidToken.class, + () -> tokenManager2.verifyToken(token1)); + } finally { + stopTokenManager(tokenManager1); + stopTokenManager(tokenManager2); + } + } + + @Test + public void testRenewToken() throws Exception { + DelegationTokenManager tokenManager1 = createTokenManager(getShortLivedTokenConf()); + DelegationTokenManager tokenManager2 = createTokenManager(getShortLivedTokenConf()); + + TestDelegationTokenSecretManager secretManager2 = + (TestDelegationTokenSecretManager) tokenManager2.getDelegationTokenSecretManager(); + + try { + // Create token on token manager 1 + Token<? extends AbstractDelegationTokenIdentifier> token1 = + tokenManager1.createToken(UserGroupInformation.getCurrentUser(), "foo"); + long expirationTime = Time.monotonicNow() + + TimeUnit.SECONDS.toMillis(TOKEN_EXPIRATION_SECONDS) * 2; + + // Load token on token manager 2 to test it doesn't get stale + tokenManager2.verifyToken(token1); + + // Renew token on token manager 1 and verify token is updated on token manager 2 + // Do this for long enough that the token should be expired if not renewed + AbstractDelegationTokenIdentifier token1Id = + (AbstractDelegationTokenIdentifier) token1.decodeIdentifier(); + while (Time.monotonicNow() < expirationTime) { + tokenManager1.renewToken(token1, "foo"); + callRemoveExpiredTokensAndValidateSQL(secretManager2, token1Id, true); + secretManager2.waitForTokenEviction(token1Id); + tokenManager2.verifyToken(token1); + } + + // Stop renewing token and validate it's no longer valid and removed + // from SQL + Thread.sleep(TimeUnit.SECONDS.toMillis(TOKEN_EXPIRATION_SECONDS) * 2); + LambdaTestUtils.intercept(SecretManager.InvalidToken.class, + () -> tokenManager2.verifyToken(token1)); + callRemoveExpiredTokensAndValidateSQL(secretManager2, token1Id, false); + } finally { + stopTokenManager(tokenManager1); + stopTokenManager(tokenManager2); + } + } + + private Configuration getShortLivedTokenConf() { + Configuration shortLivedConf = new Configuration(conf); + shortLivedConf.setTimeDuration( + SQLDelegationTokenSecretManager.SQL_DTSM_TOKEN_LOADING_CACHE_EXPIRATION, + 200, TimeUnit.MILLISECONDS); + shortLivedConf.setInt(DelegationTokenManager.RENEW_INTERVAL, TOKEN_EXPIRATION_SECONDS); + return shortLivedConf; + } + + private void callRemoveExpiredTokensAndValidateSQL( + TestDelegationTokenSecretManager secretManager, AbstractDelegationTokenIdentifier tokenId, + boolean expectedInSQL) throws SQLException { + secretManager.removeExpiredStoredToken(tokenId); + byte[] tokenInfo = secretManager.selectTokenInfo(tokenId.getSequenceNumber(), + tokenId.getBytes()); + if (expectedInSQL) { + Assert.assertNotNull("Verify token exists in database", tokenInfo); + } else { + Assert.assertNull("Verify token was removed from database", tokenInfo); + } + } + @Test public void testSequenceNumAllocation() throws Exception { int tokensPerManager = SQLDelegationTokenSecretManagerImpl.DEFAULT_SEQ_NUM_BATCH_SIZE * 5; @@ -292,8 +393,13 @@ public class TestSQLDelegationTokenSecretManagerImpl { } private DelegationTokenManager createTokenManager() { + return createTokenManager(conf); + } + + private DelegationTokenManager createTokenManager(Configuration config) { DelegationTokenManager tokenManager = new DelegationTokenManager(new Configuration(), null); - tokenManager.setExternalDelegationTokenSecretManager(new TestDelegationTokenSecretManager()); + tokenManager.setExternalDelegationTokenSecretManager( + new TestDelegationTokenSecretManager(config)); return tokenManager; } @@ -401,7 +507,7 @@ public class TestSQLDelegationTokenSecretManagerImpl { return keyRollLock; } - TestDelegationTokenSecretManager() { + TestDelegationTokenSecretManager(Configuration conf) { super(conf, new TestConnectionFactory(conf), SQLSecretManagerRetriableHandlerImpl.getInstance(conf, new TestRetryHandler())); } @@ -428,6 +534,16 @@ public class TestSQLDelegationTokenSecretManagerImpl { } } + public void waitForTokenEviction(TokenIdentifier tokenId) + throws InterruptedException, TimeoutException { + // Wait until token is not found on cache + GenericTestUtils.waitFor(() -> !this.currentTokens.containsKey(tokenId), 100, 5000); + } + + public void removeExpiredStoredToken(TokenIdentifier tokenId) { + super.removeExpiredStoredToken((AbstractDelegationTokenIdentifier) tokenId); + } + public void setReadOnly(boolean readOnly) { ((TestConnectionFactory) getConnectionFactory()).readOnly = readOnly; } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org