This is an automated email from the ASF dual-hosted git repository. inigoiri pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 694d2b646f2 HDFS-17148. RBF: SQLDelegationTokenSecretManager must cleanup expired tokens in SQL (#5966) 694d2b646f2 is described below commit 694d2b646f22dfe7dcc5bfe6bc01dfcfa96f0f00 Author: hchaverri <55413673+hchave...@users.noreply.github.com> AuthorDate: Fri Aug 25 17:44:04 2023 -0700 HDFS-17148. RBF: SQLDelegationTokenSecretManager must cleanup expired tokens in SQL (#5966) --- .../AbstractDelegationTokenSecretManager.java | 14 +++- .../SQLDelegationTokenSecretManager.java | 45 ++++++++++++ .../token/SQLDelegationTokenSecretManagerImpl.java | 24 +++++++ .../TestSQLDelegationTokenSecretManagerImpl.java | 80 ++++++++++++++++++++-- 4 files changed, 155 insertions(+), 8 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java index bc0ef624d5a..abc4b3e9933 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java @@ -178,6 +178,14 @@ public abstract class AbstractDelegationTokenSecretManager<TokenIdent return currentTokens.size(); } + /** + * Interval for tokens to be renewed. + * @return Renew interval in milliseconds. + */ + protected long getTokenRenewInterval() { + return this.tokenRenewInterval; + } + /** * Add a previously used master key to cache (when NN restarts), * should be called before activate(). @@ -738,7 +746,7 @@ public abstract class AbstractDelegationTokenSecretManager<TokenIdent Set<TokenIdent> expiredTokens = new HashSet<TokenIdent>(); synchronized (this) { Iterator<Map.Entry<TokenIdent, DelegationTokenInformation>> i = - currentTokens.entrySet().iterator(); + getCandidateTokensForCleanup().entrySet().iterator(); while (i.hasNext()) { Map.Entry<TokenIdent, DelegationTokenInformation> entry = i.next(); long renewDate = entry.getValue().getRenewDate(); @@ -752,6 +760,10 @@ public abstract class AbstractDelegationTokenSecretManager<TokenIdent logExpireTokens(expiredTokens); } + protected Map<TokenIdent, DelegationTokenInformation> getCandidateTokensForCleanup() { + return this.currentTokens; + } + protected void logExpireTokens( Collection<TokenIdent> expiredTokens) throws IOException { for (TokenIdent ident : expiredTokens) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java index 75f00d3f924..d2c41f31d1d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java @@ -24,6 +24,8 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.VisibleForTesting; @@ -50,6 +52,9 @@ public abstract class SQLDelegationTokenSecretManager<TokenIdent private static final String SQL_DTSM_TOKEN_SEQNUM_BATCH_SIZE = SQL_DTSM_CONF_PREFIX + "token.seqnum.batch.size"; public static final int DEFAULT_SEQ_NUM_BATCH_SIZE = 10; + public static final String SQL_DTSM_TOKEN_MAX_CLEANUP_RESULTS = SQL_DTSM_CONF_PREFIX + + "token.max.cleanup.results"; + public static final int SQL_DTSM_TOKEN_MAX_CLEANUP_RESULTS_DEFAULT = 1000; public static final String SQL_DTSM_TOKEN_LOADING_CACHE_EXPIRATION = SQL_DTSM_CONF_PREFIX + "token.loading.cache.expiration"; public static final long SQL_DTSM_TOKEN_LOADING_CACHE_EXPIRATION_DEFAULT = @@ -63,6 +68,9 @@ public abstract class SQLDelegationTokenSecretManager<TokenIdent // exhausted, including during initialization. private final int seqNumBatchSize; + // Number of tokens to obtain from SQL during the cleanup process. + private final int maxTokenCleanupResults; + // Last sequenceNum in the current batch that has been allocated to a token. private int currentSeqNum; @@ -82,6 +90,8 @@ public abstract class SQLDelegationTokenSecretManager<TokenIdent this.seqNumBatchSize = conf.getInt(SQL_DTSM_TOKEN_SEQNUM_BATCH_SIZE, DEFAULT_SEQ_NUM_BATCH_SIZE); + this.maxTokenCleanupResults = conf.getInt(SQL_DTSM_TOKEN_MAX_CLEANUP_RESULTS, + SQL_DTSM_TOKEN_MAX_CLEANUP_RESULTS_DEFAULT); long cacheExpirationMs = conf.getTimeDuration(SQL_DTSM_TOKEN_LOADING_CACHE_EXPIRATION, SQL_DTSM_TOKEN_LOADING_CACHE_EXPIRATION_DEFAULT, TimeUnit.MILLISECONDS); @@ -153,6 +163,39 @@ public abstract class SQLDelegationTokenSecretManager<TokenIdent return super.cancelToken(token, canceller); } + /** + * Obtain a list of tokens that will be considered for cleanup, based on the last + * time the token was updated in SQL. This list may include tokens that are not + * expired and should not be deleted (e.g. if the token was last renewed using a + * higher renewal interval). + * The number of results is limited to reduce performance impact. Some level of + * contention is expected when multiple routers run cleanup simultaneously. + * @return Map of tokens that have not been updated in SQL after the token renewal + * period. + */ + @Override + protected Map<TokenIdent, DelegationTokenInformation> getCandidateTokensForCleanup() { + Map<TokenIdent, DelegationTokenInformation> tokens = new HashMap<>(); + try { + // Query SQL for tokens that haven't been updated after + // the last token renewal period. + long maxModifiedTime = Time.now() - getTokenRenewInterval(); + Map<byte[], byte[]> tokenInfoBytesList = selectStaleTokenInfos(maxModifiedTime, + this.maxTokenCleanupResults); + + LOG.info("Found {} tokens for cleanup", tokenInfoBytesList.size()); + for (Map.Entry<byte[], byte[]> tokenInfoBytes : tokenInfoBytesList.entrySet()) { + TokenIdent tokenIdent = createTokenIdent(tokenInfoBytes.getKey()); + DelegationTokenInformation tokenInfo = createTokenInfo(tokenInfoBytes.getValue()); + tokens.put(tokenIdent, tokenInfo); + } + } catch (IOException | SQLException e) { + LOG.error("Failed to get candidate tokens for cleanup in SQL secret manager", e); + } + + return tokens; + } + /** * Removes the existing TokenInformation from the SQL database to * invalidate it. @@ -415,6 +458,8 @@ public abstract class SQLDelegationTokenSecretManager<TokenIdent // Token operations in SQL database protected abstract byte[] selectTokenInfo(int sequenceNum, byte[] tokenIdentifier) throws SQLException; + protected abstract Map<byte[], byte[]> selectStaleTokenInfos(long maxModifiedTime, + int maxResults) throws SQLException; protected abstract void insertToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo) throws SQLException; protected abstract void updateToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLDelegationTokenSecretManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLDelegationTokenSecretManagerImpl.java index 7da54778f31..e85baae0c3a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLDelegationTokenSecretManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLDelegationTokenSecretManagerImpl.java @@ -23,6 +23,9 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; @@ -150,6 +153,27 @@ public class SQLDelegationTokenSecretManagerImpl }); } + @Override + protected Map<byte[], byte[]> selectStaleTokenInfos(long maxModifiedTime, int maxResults) + throws SQLException { + return retryHandler.execute(() -> { + try (Connection connection = connectionFactory.getConnection(); + PreparedStatement statement = connection.prepareStatement( + "SELECT tokenIdentifier, tokenInfo FROM Tokens WHERE modifiedTime < ?")) { + statement.setTimestamp(1, new Timestamp(maxModifiedTime)); + statement.setMaxRows(maxResults); + try (ResultSet result = statement.executeQuery()) { + Map<byte[], byte[]> results = new HashMap<>(); + while (result.next()) { + results.put(result.getBytes("tokenIdentifier"), + result.getBytes("tokenInfo")); + } + return results; + } + } + }); + } + @Override protected void insertDelegationKey(int keyId, byte[] delegationKey) throws SQLException { retryHandler.execute(() -> { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java index 5165f7fd770..679a2dc04da 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java @@ -30,12 +30,15 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.LambdaTestUtils; @@ -52,6 +55,7 @@ public class TestSQLDelegationTokenSecretManagerImpl { private static final String CONNECTION_URL = "jdbc:derby:memory:TokenStore"; private static final int TEST_MAX_RETRIES = 3; private static final int TOKEN_EXPIRATION_SECONDS = 1; + private static final int TOKEN_EXPIRATION_SCAN_SECONDS = 1; private static Configuration conf; @Before @@ -75,6 +79,7 @@ public class TestSQLDelegationTokenSecretManagerImpl { conf.set(SQLConnectionFactory.CONNECTION_DRIVER, "org.apache.derby.jdbc.EmbeddedDriver"); conf.setInt(SQLSecretManagerRetriableHandlerImpl.MAX_RETRIES, TEST_MAX_RETRIES); conf.setInt(SQLSecretManagerRetriableHandlerImpl.RETRY_SLEEP_TIME_MS, 10); + conf.setInt(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, TOKEN_EXPIRATION_SCAN_SECONDS); } @AfterClass @@ -190,6 +195,63 @@ public class TestSQLDelegationTokenSecretManagerImpl { } } + @Test + public void testRemoveExpiredTokens() throws Exception { + DelegationTokenManager tokenManager = createTokenManager(getShortLivedTokenConf()); + + try { + TestDelegationTokenSecretManager secretManager = + (TestDelegationTokenSecretManager) tokenManager.getDelegationTokenSecretManager(); + + // Create token to be constantly renewed. + Token<? extends AbstractDelegationTokenIdentifier> token1 = + tokenManager.createToken(UserGroupInformation.getCurrentUser(), "foo"); + AbstractDelegationTokenIdentifier tokenId1 = + (AbstractDelegationTokenIdentifier) token1.decodeIdentifier(); + + // Create token expected to expire soon. + long expirationTime2 = Time.now(); + AbstractDelegationTokenIdentifier tokenId2 = storeToken(secretManager, 2, expirationTime2); + + // Create token not expected to expire soon. + long expirationTime3 = Time.now() + TimeUnit.SECONDS.toMillis(TOKEN_EXPIRATION_SECONDS) * 10; + AbstractDelegationTokenIdentifier tokenId3 = storeToken(secretManager, 3, expirationTime3); + + GenericTestUtils.waitFor(() -> { + try { + // Constantly renew token so it doesn't expire. + tokenManager.renewToken(token1, "foo"); + + // Wait for cleanup to happen so expired token is deleted from SQL. + return !isTokenInSQL(secretManager, tokenId2); + } catch (IOException | SQLException e) { + throw new RuntimeException(e); + } + }, 100, 6000); + + Assert.assertTrue("Renewed token must not be cleaned up", + isTokenInSQL(secretManager, tokenId1)); + Assert.assertTrue("Token with future expiration must not be cleaned up", + isTokenInSQL(secretManager, tokenId3)); + } finally { + stopTokenManager(tokenManager); + } + } + + private AbstractDelegationTokenIdentifier storeToken( + TestDelegationTokenSecretManager secretManager, int sequenceNum, long expirationTime) + throws IOException { + AbstractDelegationTokenIdentifier tokenId = new DelegationTokenIdentifier(new Text("Test")); + tokenId.setOwner(new Text("foo")); + tokenId.setSequenceNumber(sequenceNum); + + AbstractDelegationTokenSecretManager.DelegationTokenInformation tokenInfo = + new AbstractDelegationTokenSecretManager.DelegationTokenInformation(expirationTime, null); + secretManager.storeToken(tokenId, tokenInfo); + + return tokenId; + } + private Configuration getShortLivedTokenConf() { Configuration shortLivedConf = new Configuration(conf); shortLivedConf.setTimeDuration( @@ -203,13 +265,12 @@ public class TestSQLDelegationTokenSecretManagerImpl { TestDelegationTokenSecretManager secretManager, AbstractDelegationTokenIdentifier tokenId, boolean expectedInSQL) throws SQLException { secretManager.removeExpiredStoredToken(tokenId); - byte[] tokenInfo = secretManager.selectTokenInfo(tokenId.getSequenceNumber(), - tokenId.getBytes()); - if (expectedInSQL) { - Assert.assertNotNull("Verify token exists in database", tokenInfo); - } else { - Assert.assertNull("Verify token was removed from database", tokenInfo); - } + Assert.assertEquals(expectedInSQL, isTokenInSQL(secretManager, tokenId)); + } + + private boolean isTokenInSQL(TestDelegationTokenSecretManager secretManager, + AbstractDelegationTokenIdentifier tokenId) throws SQLException { + return secretManager.selectTokenInfo(tokenId.getSequenceNumber(), tokenId.getBytes()) != null; } @Test @@ -544,6 +605,11 @@ public class TestSQLDelegationTokenSecretManagerImpl { super.removeExpiredStoredToken((AbstractDelegationTokenIdentifier) tokenId); } + public void storeToken(AbstractDelegationTokenIdentifier ident, + DelegationTokenInformation tokenInfo) throws IOException { + super.storeToken(ident, tokenInfo); + } + public void setReadOnly(boolean readOnly) { ((TestConnectionFactory) getConnectionFactory()).readOnly = readOnly; } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org