This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.1 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 7ea8276729e48c073f05e953e1754082ea544e92 Author: Jingsong Lee <[email protected]> AuthorDate: Wed Apr 30 23:17:49 2025 +0800 [rest] Simplify auth provider to synchronously refresh token (#5562) --- .../java/org/apache/paimon/rest/HttpClient.java | 3 - .../java/org/apache/paimon/rest/RESTCatalog.java | 36 +-- .../org/apache/paimon/rest/RESTCatalogOptions.java | 8 - .../java/org/apache/paimon/rest/RESTClient.java | 3 +- .../org/apache/paimon/rest/RESTTokenFileIO.java | 9 +- .../org/apache/paimon/rest/auth/AuthProvider.java | 22 +- .../paimon/rest/auth/AuthProviderFactory.java | 11 +- .../org/apache/paimon/rest/auth/AuthSession.java | 152 ------------- .../paimon/rest/auth/BearTokenAuthProvider.java | 7 +- .../apache/paimon/rest/auth/DLFAuthProvider.java | 138 +++++------- .../paimon/rest/auth/DLFAuthProviderFactory.java | 9 +- .../apache/paimon/rest/auth/DLFECSTokenLoader.java | 7 +- .../paimon/rest/auth/DLFLocalFileTokenLoader.java | 42 ++-- .../java/org/apache/paimon/rest/auth/DLFToken.java | 28 ++- .../apache/paimon/rest/auth/DLFTokenLoader.java | 2 + .../apache/paimon/rest/auth/RESTAuthFunction.java | 11 +- .../org/apache/paimon/rest/HttpClientTest.java | 4 +- .../apache/paimon/rest/MockRESTCatalogTest.java | 4 +- .../org/apache/paimon/rest/RESTCatalogServer.java | 2 +- .../org/apache/paimon/rest/RESTCatalogTest.java | 2 +- ...{AuthSessionTest.java => AuthProviderTest.java} | 247 +++++++-------------- .../paimon/rest/auth/CustomTestDLFTokenLoader.java | 7 +- 22 files changed, 241 insertions(+), 513 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java index dca501b693..12f7f2938a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java @@ -241,7 +241,4 @@ public class HttpClient implements RESTClient { new RESTAuthParameter(path, queryParams, method, data); return headerFunction.apply(restAuthParameter); } - - @Override - public void close() {} } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index 53336f5f2c..7ac3882ed1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -33,7 +33,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; import org.apache.paimon.partition.Partition; import org.apache.paimon.partition.PartitionStatistics; -import org.apache.paimon.rest.auth.AuthSession; +import org.apache.paimon.rest.auth.AuthProvider; import org.apache.paimon.rest.auth.RESTAuthFunction; import org.apache.paimon.rest.auth.RESTAuthParameter; import org.apache.paimon.rest.exceptions.AlreadyExistsException; @@ -102,7 +102,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; import java.util.function.Function; import java.util.stream.Collectors; @@ -118,8 +117,7 @@ import static org.apache.paimon.catalog.CatalogUtils.validateAutoCreateClose; import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE; import static org.apache.paimon.options.CatalogOptions.WAREHOUSE; import static org.apache.paimon.rest.RESTUtil.extractPrefixMap; -import static org.apache.paimon.rest.auth.AuthSession.createAuthSession; -import static org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool; +import static org.apache.paimon.rest.auth.AuthProviderFactory.createAuthProvider; /** A catalog implementation for REST. */ public class RESTCatalog implements Catalog { @@ -128,6 +126,7 @@ public class RESTCatalog implements Catalog { public static final String MAX_RESULTS = "maxResults"; public static final String PAGE_TOKEN = "pageToken"; public static final String QUERY_PARAMETER_WAREHOUSE_KEY = "warehouse"; + public static final long TOKEN_EXPIRATION_SAFE_TIME_MILLIS = 3_600_000L; private final RESTClient client; private final ResourcePaths resourcePaths; @@ -135,15 +134,13 @@ public class RESTCatalog implements Catalog { private final boolean dataTokenEnabled; private final RESTAuthFunction restAuthFunction; - private volatile ScheduledExecutorService refreshExecutor = null; - public RESTCatalog(CatalogContext context) { this(context, true); } public RESTCatalog(CatalogContext context, boolean configRequired) { this.client = new HttpClient(context.options().get(RESTCatalogOptions.URI)); - AuthSession catalogAuth = createAuthSession(context.options(), tokenRefreshExecutor()); + AuthProvider authProvider = createAuthProvider(context.options()); Options options = context.options(); Map<String, String> baseHeaders = Collections.emptyMap(); if (configRequired) { @@ -161,11 +158,11 @@ public class RESTCatalog implements Catalog { queryParams, ConfigResponse.class, new RESTAuthFunction( - Collections.emptyMap(), catalogAuth)) + Collections.emptyMap(), authProvider)) .merge(context.options().toMap())); baseHeaders.putAll(extractPrefixMap(options, HEADER_PREFIX)); } - this.restAuthFunction = new RESTAuthFunction(baseHeaders, catalogAuth); + this.restAuthFunction = new RESTAuthFunction(baseHeaders, authProvider); context = CatalogContext.create(options, context.preferIO(), context.fallbackIO()); this.context = context; this.resourcePaths = ResourcePaths.forCatalogProperties(options); @@ -951,14 +948,7 @@ public class RESTCatalog implements Catalog { } @Override - public void close() throws Exception { - if (refreshExecutor != null) { - refreshExecutor.shutdownNow(); - } - if (client != null) { - client.close(); - } - } + public void close() throws Exception {} @VisibleForTesting Map<String, String> headers(RESTAuthParameter restAuthParameter) { @@ -1004,18 +994,6 @@ public class RESTCatalog implements Catalog { return results; } - private ScheduledExecutorService tokenRefreshExecutor() { - if (refreshExecutor == null) { - synchronized (this) { - if (refreshExecutor == null) { - this.refreshExecutor = createScheduledThreadPool(1, "token-refresh-thread"); - } - } - } - - return refreshExecutor; - } - private FileIO fileIOForData(Path path, Identifier identifier) { return dataTokenEnabled ? new RESTTokenFileIO(catalogLoader(), this, identifier, path) diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java index 0359f6fa25..8c5cc586b2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java @@ -21,8 +21,6 @@ package org.apache.paimon.rest; import org.apache.paimon.options.ConfigOption; import org.apache.paimon.options.ConfigOptions; -import java.time.Duration; - /** Options for REST Catalog. */ public class RESTCatalogOptions { @@ -38,12 +36,6 @@ public class RESTCatalogOptions { .noDefaultValue() .withDescription("REST Catalog auth bear token."); - public static final ConfigOption<Duration> TOKEN_REFRESH_TIME = - ConfigOptions.key("token.refresh-time") - .durationType() - .defaultValue(Duration.ofHours(1)) - .withDescription("REST Catalog auth token refresh time."); - public static final ConfigOption<String> TOKEN_PROVIDER = ConfigOptions.key("token.provider") .stringType() diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java index ac1a27a2be..b2058ec806 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java @@ -20,11 +20,10 @@ package org.apache.paimon.rest; import org.apache.paimon.rest.auth.RESTAuthFunction; -import java.io.Closeable; import java.util.Map; /** Interface for a basic HTTP Client for interfacing with the REST catalog. */ -public interface RESTClient extends Closeable { +public interface RESTClient { <T extends RESTResponse> T get( String path, Class<T> responseType, RESTAuthFunction restAuthFunction); diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java index bcf7840dc5..13e31e8634 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java @@ -46,6 +46,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static org.apache.paimon.options.CatalogOptions.FILE_IO_ALLOW_CACHE; +import static org.apache.paimon.rest.RESTCatalog.TOKEN_EXPIRATION_SAFE_TIME_MILLIS; /** A {@link FileIO} to support getting token from REST Server. */ public class RESTTokenFileIO implements FileIO { @@ -191,11 +192,13 @@ public class RESTTokenFileIO implements FileIO { } private boolean shouldRefresh() { - return token == null || token.expireAtMillis() - System.currentTimeMillis() < 3600_000L; + return token == null + || token.expireAtMillis() - System.currentTimeMillis() + < TOKEN_EXPIRATION_SAFE_TIME_MILLIS; } private void refreshToken() { - LOG.info("begin refresh token for identifier [{}]", identifier); + LOG.info("begin refresh data token for identifier [{}]", identifier); GetTableTokenResponse response; if (catalogInstance != null) { try { @@ -211,7 +214,7 @@ public class RESTTokenFileIO implements FileIO { } } LOG.info( - "end refresh token for identifier [{}] expiresAtMillis [{}]", + "end refresh data token for identifier [{}] expiresAtMillis [{}]", identifier, response.getExpiresAtMillis()); diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProvider.java b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProvider.java index 666337ec79..d7755ff053 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProvider.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProvider.java @@ -19,28 +19,10 @@ package org.apache.paimon.rest.auth; import java.util.Map; -import java.util.Optional; /** Authentication provider. */ public interface AuthProvider { - Map<String, String> header(Map<String, String> baseHeader, RESTAuthParameter restAuthParameter); - - boolean refresh(); - - default boolean keepRefreshed() { - return false; - } - - default boolean willSoonExpire() { - return false; - } - - default Optional<Long> expiresAtMillis() { - return Optional.empty(); - } - - default Optional<Long> tokenRefreshInMills() { - return Optional.empty(); - } + Map<String, String> mergeAuthHeader( + Map<String, String> baseHeader, RESTAuthParameter restAuthParameter); } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProviderFactory.java b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProviderFactory.java index 1cccfeb626..7562451a34 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProviderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProviderFactory.java @@ -21,18 +21,25 @@ package org.apache.paimon.rest.auth; import org.apache.paimon.factories.Factory; import org.apache.paimon.factories.FactoryUtil; import org.apache.paimon.options.Options; +import org.apache.paimon.utils.StringUtils; + +import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN_PROVIDER; /** Factory for {@link AuthProvider}. */ public interface AuthProviderFactory extends Factory { AuthProvider create(Options options); - static AuthProvider createAuthProvider(String name, Options options) { + static AuthProvider createAuthProvider(Options options) { + String tokenProvider = options.get(TOKEN_PROVIDER); + if (StringUtils.isEmpty(tokenProvider)) { + throw new IllegalArgumentException("token.provider is not set."); + } AuthProviderFactory factory = FactoryUtil.discoverFactory( AuthProviderFactory.class.getClassLoader(), AuthProviderFactory.class, - name); + tokenProvider); return factory.create(options); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java deleted file mode 100644 index 1ce391e00d..0000000000 --- a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.rest.auth; - -import org.apache.paimon.annotation.VisibleForTesting; -import org.apache.paimon.options.Options; -import org.apache.paimon.rest.RESTCatalogOptions; -import org.apache.paimon.utils.StringUtils; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Optional; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -/** Authentication session. */ -public class AuthSession { - - static final int REFRESH_NUM_RETRIES = 5; - static final long MIN_REFRESH_WAIT_MILLIS = 10; - static final long MAX_REFRESH_WINDOW_MILLIS = 300_000; // 5 minutes - - private static final Logger LOG = LoggerFactory.getLogger(AuthSession.class); - - private final AuthProvider authProvider; - - public AuthSession(AuthProvider authProvider) { - this.authProvider = authProvider; - } - - public static AuthSession fromRefreshAuthProvider( - ScheduledExecutorService executor, AuthProvider authProvider) { - AuthSession session = new AuthSession(authProvider); - - long startTimeMillis = System.currentTimeMillis(); - Optional<Long> expiresAtMillisOpt = authProvider.expiresAtMillis(); - - // when init session if token expire time is in the past, refresh it and update - // expiresAtMillis - if (expiresAtMillisOpt.isPresent() && expiresAtMillisOpt.get() <= startTimeMillis) { - boolean refreshSuccessful = session.refresh(); - if (refreshSuccessful) { - expiresAtMillisOpt = session.authProvider.expiresAtMillis(); - } - } - - if (null != executor && expiresAtMillisOpt.isPresent()) { - scheduleTokenRefresh(executor, session, expiresAtMillisOpt.get()); - } - - return session; - } - - public AuthProvider getAuthProvider() { - if (this.authProvider.keepRefreshed() && this.authProvider.willSoonExpire()) { - refresh(); - } - return this.authProvider; - } - - public Boolean refresh() { - if (this.authProvider.keepRefreshed() - && this.authProvider.tokenRefreshInMills().isPresent()) { - return this.authProvider.refresh(); - } - - return false; - } - - @VisibleForTesting - static void scheduleTokenRefresh( - ScheduledExecutorService executor, AuthSession session, long expiresAtMillis) { - scheduleTokenRefresh(executor, session, expiresAtMillis, 0); - } - - @VisibleForTesting - static long getTimeToWaitByExpiresInMills(long expiresInMillis) { - // how much ahead of time to start the refresh to allow it to complete - long refreshWindowMillis = Math.min(expiresInMillis, MAX_REFRESH_WINDOW_MILLIS); - // how much time to wait before expiration - long waitIntervalMillis = expiresInMillis - refreshWindowMillis; - // how much time to actually wait - return Math.max(waitIntervalMillis, MIN_REFRESH_WAIT_MILLIS); - } - - private static void scheduleTokenRefresh( - ScheduledExecutorService executor, - AuthSession session, - long expiresAtMillis, - int retryTimes) { - if (retryTimes < REFRESH_NUM_RETRIES) { - long expiresInMillis = expiresAtMillis - System.currentTimeMillis(); - long timeToWait = getTimeToWaitByExpiresInMills(expiresInMillis); - - executor.schedule( - () -> doRefresh(executor, session, expiresAtMillis, retryTimes), - timeToWait, - TimeUnit.MILLISECONDS); - } else { - LOG.warn("Failed to refresh token after {} retries.", REFRESH_NUM_RETRIES); - } - } - - private static void doRefresh( - ScheduledExecutorService executor, - AuthSession session, - long expiresAtMillis, - int retryTimes) { - long refreshStartTime = System.currentTimeMillis(); - boolean isSuccessful = session.refresh(); - if (isSuccessful) { - scheduleTokenRefresh( - executor, - session, - refreshStartTime + session.authProvider.tokenRefreshInMills().get(), - 0); - } else { - scheduleTokenRefresh(executor, session, expiresAtMillis, retryTimes + 1); - } - } - - public static AuthSession createAuthSession( - Options options, ScheduledExecutorService refreshExecutor) { - String tokenProvider = options.get(RESTCatalogOptions.TOKEN_PROVIDER); - if (StringUtils.isEmpty(tokenProvider)) { - throw new IllegalArgumentException("token.provider is not set."); - } - AuthProvider authProvider = AuthProviderFactory.createAuthProvider(tokenProvider, options); - if (authProvider.keepRefreshed()) { - return AuthSession.fromRefreshAuthProvider(refreshExecutor, authProvider); - } else { - return new AuthSession(authProvider); - } - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java index a501b98556..39625818a6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java @@ -35,15 +35,10 @@ public class BearTokenAuthProvider implements AuthProvider { } @Override - public Map<String, String> header( + public Map<String, String> mergeAuthHeader( Map<String, String> baseHeader, RESTAuthParameter restAuthParameter) { Map<String, String> headersWithAuth = new HashMap<>(baseHeader); headersWithAuth.put(AUTHORIZATION_HEADER_KEY, BEARER_PREFIX + token); return headersWithAuth; } - - @Override - public boolean refresh() { - return true; - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProvider.java b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProvider.java index f82720b33f..7eece82a92 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProvider.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProvider.java @@ -18,19 +18,28 @@ package org.apache.paimon.rest.auth; +import org.apache.paimon.annotation.VisibleForTesting; + import okhttp3.MediaType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; -import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Map; -import java.util.Optional; + +import static org.apache.paimon.rest.RESTCatalog.TOKEN_EXPIRATION_SAFE_TIME_MILLIS; +import static org.apache.paimon.utils.Preconditions.checkNotNull; /** Auth provider for <b>Ali CLoud</b> DLF. */ public class DLFAuthProvider implements AuthProvider { + private static final Logger LOG = LoggerFactory.getLogger(DLFAuthProvider.class); + public static final String DLF_AUTHORIZATION_HEADER_KEY = "Authorization"; public static final String DLF_CONTENT_MD5_HEADER_KEY = "Content-MD5"; public static final String DLF_CONTENT_TYPE_KEY = "Content-Type"; @@ -39,55 +48,36 @@ public class DLFAuthProvider implements AuthProvider { public static final String DLF_AUTH_VERSION_HEADER_KEY = "x-dlf-version"; public static final String DLF_CONTENT_SHA56_HEADER_KEY = "x-dlf-content-sha256"; public static final String DLF_CONTENT_SHA56_VALUE = "UNSIGNED-PAYLOAD"; - public static final double EXPIRED_FACTOR = 0.4; - public static final DateTimeFormatter TOKEN_DATE_FORMATTER = - DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'"); + public static final DateTimeFormatter AUTH_DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmss'Z'"); - public static final DateTimeFormatter AUTH_DATE_FORMATTER = - DateTimeFormatter.ofPattern("yyyyMMdd"); protected static final MediaType MEDIA_TYPE = MediaType.parse("application/json"); - private final DLFTokenLoader tokenLoader; - - protected DLFToken token; - private final boolean keepRefreshed; - private Long expiresAtMillis; - private final Long tokenRefreshInMills; + @Nullable private final DLFTokenLoader tokenLoader; + @Nullable protected DLFToken token; private final String region; - public static DLFAuthProvider buildRefreshToken( - DLFTokenLoader tokenLoader, Long tokenRefreshInMills, String region) { - DLFToken token = tokenLoader.loadToken(); - Long expiresAtMillis = getExpirationInMills(token.getExpiration()); - return new DLFAuthProvider( - tokenLoader, token, true, expiresAtMillis, tokenRefreshInMills, region); + public static DLFAuthProvider fromTokenLoader(DLFTokenLoader tokenLoader, String region) { + return new DLFAuthProvider(tokenLoader, null, region); } - public static DLFAuthProvider buildAKToken( + public static DLFAuthProvider fromAccessKey( String accessKeyId, String accessKeySecret, String securityToken, String region) { DLFToken token = new DLFToken(accessKeyId, accessKeySecret, securityToken, null); - return new DLFAuthProvider(null, token, false, null, null, region); + return new DLFAuthProvider(null, token, region); } public DLFAuthProvider( - DLFTokenLoader tokenLoader, - DLFToken token, - boolean keepRefreshed, - Long expiresAtMillis, - Long tokenRefreshInMills, - String region) { + @Nullable DLFTokenLoader tokenLoader, @Nullable DLFToken token, String region) { this.tokenLoader = tokenLoader; this.token = token; - this.keepRefreshed = keepRefreshed; - this.expiresAtMillis = expiresAtMillis; - this.tokenRefreshInMills = tokenRefreshInMills; this.region = region; } @Override - public Map<String, String> header( + public Map<String, String> mergeAuthHeader( Map<String, String> baseHeader, RESTAuthParameter restAuthParameter) { + DLFToken token = getFreshToken(); try { String dateTime = baseHeader.getOrDefault( @@ -109,6 +99,43 @@ public class DLFAuthProvider implements AuthProvider { } } + @VisibleForTesting + DLFToken getFreshToken() { + if (shouldRefresh()) { + synchronized (this) { + if (shouldRefresh()) { + refreshToken(); + } + } + } + return token; + } + + private void refreshToken() { + checkNotNull(tokenLoader); + LOG.info("begin refresh meta token for loader [{}]", tokenLoader.description()); + this.token = tokenLoader.loadToken(); + checkNotNull(token); + LOG.info( + "end refresh meta token for loader [{}] expiresAtMillis [{}]", + tokenLoader.description(), + token.getExpirationAtMills()); + } + + private boolean shouldRefresh() { + // no token, get new one + if (token == null) { + return true; + } + // never expire + Long expireTime = token.getExpirationAtMills(); + if (expireTime == null) { + return false; + } + long now = System.currentTimeMillis(); + return expireTime - now < TOKEN_EXPIRATION_SAFE_TIME_MILLIS; + } + public static Map<String, String> generateSignHeaders( String data, String dateTime, String securityToken) throws Exception { Map<String, String> signHeaders = new HashMap<>(); @@ -124,53 +151,4 @@ public class DLFAuthProvider implements AuthProvider { } return signHeaders; } - - @Override - public boolean refresh() { - long start = System.currentTimeMillis(); - DLFToken newToken = tokenLoader.loadToken(); - if (newToken == null) { - return false; - } - this.expiresAtMillis = start + this.tokenRefreshInMills; - this.token = newToken; - return true; - } - - @Override - public boolean keepRefreshed() { - return this.keepRefreshed; - } - - @Override - public boolean willSoonExpire() { - if (keepRefreshed()) { - return expiresAtMillis().get() - System.currentTimeMillis() - < tokenRefreshInMills().get() * EXPIRED_FACTOR; - } else { - return false; - } - } - - @Override - public Optional<Long> expiresAtMillis() { - return Optional.ofNullable(this.expiresAtMillis); - } - - @Override - public Optional<Long> tokenRefreshInMills() { - return Optional.ofNullable(this.tokenRefreshInMills); - } - - private static Long getExpirationInMills(String dateStr) { - try { - if (dateStr == null) { - return null; - } - LocalDateTime dateTime = LocalDateTime.parse(dateStr, TOKEN_DATE_FORMATTER); - return dateTime.atZone(ZoneOffset.UTC).toInstant().toEpochMilli(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProviderFactory.java b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProviderFactory.java index 72d9ea5580..030dc396e1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProviderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProviderFactory.java @@ -24,7 +24,6 @@ import org.apache.paimon.rest.RESTCatalogOptions; import java.util.regex.Matcher; import java.util.regex.Pattern; -import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN_REFRESH_TIME; import static org.apache.paimon.rest.RESTCatalogOptions.URI; /** Factory for {@link DLFAuthProvider}. */ @@ -44,16 +43,14 @@ public class DLFAuthProviderFactory implements AuthProviderFactory { DLFTokenLoader dlfTokenLoader = DLFTokenLoaderFactory.createDLFTokenLoader( options.get(RESTCatalogOptions.DLF_TOKEN_LOADER), options); - long tokenRefreshInMills = options.get(TOKEN_REFRESH_TIME).toMillis(); - return DLFAuthProvider.buildRefreshToken(dlfTokenLoader, tokenRefreshInMills, region); + return DLFAuthProvider.fromTokenLoader(dlfTokenLoader, region); } else if (options.getOptional(RESTCatalogOptions.DLF_TOKEN_PATH).isPresent()) { DLFTokenLoader dlfTokenLoader = DLFTokenLoaderFactory.createDLFTokenLoader("local_file", options); - long tokenRefreshInMills = options.get(TOKEN_REFRESH_TIME).toMillis(); - return DLFAuthProvider.buildRefreshToken(dlfTokenLoader, tokenRefreshInMills, region); + return DLFAuthProvider.fromTokenLoader(dlfTokenLoader, region); } else if (options.getOptional(RESTCatalogOptions.DLF_ACCESS_KEY_ID).isPresent() && options.getOptional(RESTCatalogOptions.DLF_ACCESS_KEY_SECRET).isPresent()) { - return DLFAuthProvider.buildAKToken( + return DLFAuthProvider.fromAccessKey( options.get(RESTCatalogOptions.DLF_ACCESS_KEY_ID), options.get(RESTCatalogOptions.DLF_ACCESS_KEY_SECRET), options.get(RESTCatalogOptions.DLF_SECURITY_TOKEN), diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFECSTokenLoader.java b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFECSTokenLoader.java index c73f32c990..3806b045d3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFECSTokenLoader.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFECSTokenLoader.java @@ -53,7 +53,7 @@ public class DLFECSTokenLoader implements DLFTokenLoader { .readTimeout(Duration.ofMinutes(3)) .build(); - private String ecsMetadataURL; + private final String ecsMetadataURL; private String roleName; @@ -70,6 +70,11 @@ public class DLFECSTokenLoader implements DLFTokenLoader { return getToken(ecsMetadataURL + roleName); } + @Override + public String description() { + return ecsMetadataURL; + } + private static String getRole(String url) { try { return getResponseBody(url); diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFLocalFileTokenLoader.java b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFLocalFileTokenLoader.java index c1aea84280..22d0f4090d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFLocalFileTokenLoader.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFLocalFileTokenLoader.java @@ -21,17 +21,12 @@ package org.apache.paimon.rest.auth; import org.apache.paimon.utils.FileIOUtils; import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.UncheckedIOException; import static org.apache.paimon.rest.RESTObjectMapper.OBJECT_MAPPER; /** DLF Token Loader for local file. */ public class DLFLocalFileTokenLoader implements DLFTokenLoader { - private static final long[] READ_TOKEN_FILE_BACKOFF_WAIT_TIME_MILLIS = {1_000, 3_000, 5_000}; - private final String tokenFilePath; public DLFLocalFileTokenLoader(String tokenFilePath) { @@ -40,25 +35,32 @@ public class DLFLocalFileTokenLoader implements DLFTokenLoader { @Override public DLFToken loadToken() { - return readToken(tokenFilePath, 0); + return readToken(tokenFilePath); } - protected static DLFToken readToken(String tokenFilePath, int retryTimes) { - try { - File tokenFile = new File(tokenFilePath); - if (tokenFile.exists()) { - String tokenStr = FileIOUtils.readFileUtf8(tokenFile); + @Override + public String description() { + return tokenFilePath; + } + + protected static DLFToken readToken(String tokenFilePath) { + int retry = 1; + Exception lastException = null; + while (retry <= 5) { + try { + String tokenStr = FileIOUtils.readFileUtf8(new File(tokenFilePath)); return OBJECT_MAPPER.readValue(tokenStr, DLFToken.class); - } else if (retryTimes < READ_TOKEN_FILE_BACKOFF_WAIT_TIME_MILLIS.length - 1) { - Thread.sleep(READ_TOKEN_FILE_BACKOFF_WAIT_TIME_MILLIS[retryTimes]); - return readToken(tokenFilePath, retryTimes + 1); - } else { - throw new FileNotFoundException(tokenFilePath); + } catch (Exception e) { + lastException = e; + } + try { + Thread.sleep(retry * 1000L); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); } - } catch (IOException e) { - throw new UncheckedIOException(e); - } catch (InterruptedException e) { - throw new RuntimeException(e); + retry++; } + throw new RuntimeException(lastException); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFToken.java b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFToken.java index c961dfe38b..37590cad8c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFToken.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFToken.java @@ -19,15 +19,25 @@ package org.apache.paimon.rest.auth; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import javax.annotation.Nullable; + +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; import java.util.Objects; /** <b>Ali CLoud</b> DLF Token. */ @JsonIgnoreProperties(ignoreUnknown = true) public class DLFToken { + public static final DateTimeFormatter TOKEN_DATE_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'"); + private static final String ACCESS_KEY_ID_FIELD_NAME = "AccessKeyId"; private static final String ACCESS_KEY_SECRET_FIELD_NAME = "AccessKeySecret"; private static final String SECURITY_TOKEN_FIELD_NAME = "SecurityToken"; @@ -42,19 +52,30 @@ public class DLFToken { @JsonProperty(SECURITY_TOKEN_FIELD_NAME) private final String securityToken; + @JsonInclude(JsonInclude.Include.NON_NULL) @JsonProperty(EXPIRATION_FIELD_NAME) + @Nullable private final String expiration; + @JsonIgnore @Nullable private final Long expirationAtMills; + @JsonCreator public DLFToken( @JsonProperty(ACCESS_KEY_ID_FIELD_NAME) String accessKeyId, @JsonProperty(ACCESS_KEY_SECRET_FIELD_NAME) String accessKeySecret, @JsonProperty(SECURITY_TOKEN_FIELD_NAME) String securityToken, - @JsonProperty(EXPIRATION_FIELD_NAME) String expiration) { + @Nullable @JsonProperty(EXPIRATION_FIELD_NAME) String expiration) { this.accessKeyId = accessKeyId; this.accessKeySecret = accessKeySecret; this.securityToken = securityToken; this.expiration = expiration; + if (expiration == null) { + this.expirationAtMills = null; + } else { + LocalDateTime dateTime = LocalDateTime.parse(expiration, TOKEN_DATE_FORMATTER); + // Note: the date time is UTC time zone + this.expirationAtMills = dateTime.atZone(ZoneOffset.UTC).toInstant().toEpochMilli(); + } } public String getAccessKeyId() { @@ -69,8 +90,9 @@ public class DLFToken { return securityToken; } - public String getExpiration() { - return expiration; + @Nullable + public Long getExpirationAtMills() { + return expirationAtMills; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFTokenLoader.java b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFTokenLoader.java index fb2199e320..0b45284223 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFTokenLoader.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFTokenLoader.java @@ -22,4 +22,6 @@ package org.apache.paimon.rest.auth; public interface DLFTokenLoader { DLFToken loadToken(); + + String description(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/auth/RESTAuthFunction.java b/paimon-core/src/main/java/org/apache/paimon/rest/auth/RESTAuthFunction.java index b76701845f..254eca75ca 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/auth/RESTAuthFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/RESTAuthFunction.java @@ -25,18 +25,15 @@ import java.util.function.Function; public class RESTAuthFunction implements Function<RESTAuthParameter, Map<String, String>> { private final Map<String, String> initHeader; - private final AuthSession authSession; + private final AuthProvider authProvider; - public RESTAuthFunction(Map<String, String> initHeader, AuthSession authSession) { + public RESTAuthFunction(Map<String, String> initHeader, AuthProvider authProvider) { this.initHeader = initHeader; - this.authSession = authSession; + this.authProvider = authProvider; } @Override public Map<String, String> apply(RESTAuthParameter restAuthParameter) { - if (authSession != null) { - return authSession.getAuthProvider().header(initHeader, restAuthParameter); - } - return initHeader; + return authProvider.mergeAuthHeader(initHeader, restAuthParameter); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java index b1f68bd71e..dfdfe5f9b9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java @@ -19,7 +19,6 @@ package org.apache.paimon.rest; import org.apache.paimon.rest.auth.AuthProvider; -import org.apache.paimon.rest.auth.AuthSession; import org.apache.paimon.rest.auth.BearTokenAuthProvider; import org.apache.paimon.rest.auth.RESTAuthFunction; import org.apache.paimon.rest.auth.RESTAuthParameter; @@ -85,9 +84,8 @@ public class HttpClientTest { httpClient = new HttpClient(server.getBaseUrl()); httpClient.setErrorHandler(errorHandler); AuthProvider authProvider = new BearTokenAuthProvider(TOKEN); - AuthSession authSession = new AuthSession(authProvider); headers = new HashMap<>(); - restAuthFunction = new RESTAuthFunction(headers, authSession); + restAuthFunction = new RESTAuthFunction(headers, authProvider); } @After diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java index 69ea8a5185..5fcf1c9b84 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java @@ -99,7 +99,7 @@ class MockRESTCatalogTest extends RESTCatalogTest { String akSecret = "akSecret" + UUID.randomUUID(); String securityToken = "securityToken" + UUID.randomUUID(); String region = "cn-hangzhou"; - this.authProvider = DLFAuthProvider.buildAKToken(akId, akSecret, securityToken, region); + this.authProvider = DLFAuthProvider.fromAccessKey(akId, akSecret, securityToken, region); this.authMap = ImmutableMap.of( RESTCatalogOptions.TOKEN_PROVIDER.key(), AuthProviderEnum.DLF.identifier(), @@ -122,7 +122,7 @@ class MockRESTCatalogTest extends RESTCatalogTest { new Options( ImmutableMap.of( RESTCatalogOptions.DLF_TOKEN_PATH.key(), tokenPath))); - this.authProvider = DLFAuthProvider.buildRefreshToken(tokenLoader, 1000_000L, region); + this.authProvider = DLFAuthProvider.fromTokenLoader(tokenLoader, region); this.authMap = ImmutableMap.of( RESTCatalogOptions.TOKEN_PROVIDER.key(), AuthProviderEnum.DLF.identifier(), diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java index 6173e43361..655647ba42 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java @@ -243,7 +243,7 @@ public class RESTCatalogServer { resourcePath, parameters, request.getMethod(), data); String authToken = authProvider - .header(headers, restAuthParameter) + .mergeAuthHeader(headers, restAuthParameter) .get(AUTHORIZATION_HEADER_KEY); if (!authToken.equals(token)) { return new MockResponse().setResponseCode(401); diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index 08ba3d22f0..b567e67adb 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -79,7 +79,7 @@ import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE; import static org.apache.paimon.CoreOptions.METASTORE_TAG_TO_PARTITION; import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME; import static org.apache.paimon.rest.RESTCatalog.PAGE_TOKEN; -import static org.apache.paimon.rest.auth.DLFAuthProvider.TOKEN_DATE_FORMATTER; +import static org.apache.paimon.rest.auth.DLFToken.TOKEN_DATE_FORMATTER; import static org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMillis; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthProviderTest.java similarity index 61% rename from paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java rename to paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthProviderTest.java index 16285e734d..770dadab5f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthProviderTest.java @@ -21,7 +21,6 @@ package org.apache.paimon.rest.auth; import org.apache.paimon.options.Options; import org.apache.paimon.rest.RESTCatalogOptions; import org.apache.paimon.utils.Pair; -import org.apache.paimon.utils.ThreadPoolUtils; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -29,7 +28,6 @@ import org.apache.commons.io.FileUtils; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import org.mockito.Mockito; import java.io.File; import java.io.IOException; @@ -37,10 +35,9 @@ import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.util.HashMap; import java.util.Map; -import java.util.Optional; import java.util.UUID; -import java.util.concurrent.ScheduledExecutorService; +import static org.apache.paimon.rest.RESTCatalog.TOKEN_EXPIRATION_SAFE_TIME_MILLIS; import static org.apache.paimon.rest.RESTCatalogOptions.DLF_ACCESS_KEY_ID; import static org.apache.paimon.rest.RESTCatalogOptions.DLF_ACCESS_KEY_SECRET; import static org.apache.paimon.rest.RESTCatalogOptions.DLF_REGION; @@ -50,21 +47,16 @@ import static org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_ECS_ROLE_NAME; import static org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_LOADER; import static org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_PATH; import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN; -import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN_REFRESH_TIME; -import static org.apache.paimon.rest.auth.AuthSession.MAX_REFRESH_WINDOW_MILLIS; -import static org.apache.paimon.rest.auth.AuthSession.MIN_REFRESH_WAIT_MILLIS; -import static org.apache.paimon.rest.auth.AuthSession.REFRESH_NUM_RETRIES; +import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN_PROVIDER; import static org.apache.paimon.rest.auth.DLFAuthProvider.DLF_AUTHORIZATION_HEADER_KEY; import static org.apache.paimon.rest.auth.DLFAuthProvider.DLF_DATE_HEADER_KEY; -import static org.apache.paimon.rest.auth.DLFAuthProvider.TOKEN_DATE_FORMATTER; +import static org.apache.paimon.rest.auth.DLFToken.TOKEN_DATE_FORMATTER; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -/** Test for {@link AuthSession}. */ -public class AuthSessionTest { +/** Test for {@link AuthProvider}. */ +public class AuthProviderTest { @Rule public TemporaryFolder folder = new TemporaryFolder(); private static final ObjectMapper OBJECT_MAPPER_INSTANCE = new ObjectMapper(); @@ -77,36 +69,28 @@ public class AuthSessionTest { initialHeaders.put("k2", "v2"); Options options = new Options(); options.set(TOKEN.key(), token); - AuthProvider authProvider = - AuthProviderFactory.createAuthProvider(AuthProviderEnum.BEAR.identifier(), options); - AuthSession session = new AuthSession(authProvider); - Map<String, String> headers = session.getAuthProvider().header(initialHeaders, null); + options.set(TOKEN_PROVIDER.key(), "bear"); + AuthProvider authProvider = AuthProviderFactory.createAuthProvider(options); + Map<String, String> headers = authProvider.mergeAuthHeader(initialHeaders, null); assertEquals( headers.get(BearTokenAuthProvider.AUTHORIZATION_HEADER_KEY), "Bearer " + token); } @Test - public void testRefreshDLFAuthTokenFileAuthProvider() throws IOException, InterruptedException { + public void testRefreshDLFAuthTokenFileAuthProvider() throws IOException { String fileName = UUID.randomUUID().toString(); Pair<File, String> tokenFile2Token = generateTokenAndWriteToFile(fileName); String theFirstGenerateToken = tokenFile2Token.getRight(); File tokenFile = tokenFile2Token.getLeft(); - long tokenRefreshInMills = 1000; - AuthProvider authProvider = - generateDLFAuthProvider(Optional.of(tokenRefreshInMills), fileName, "serverUrl"); - ScheduledExecutorService executor = - ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token"); - AuthSession session = AuthSession.fromRefreshAuthProvider(executor, authProvider); - DLFAuthProvider dlfAuthProvider = (DLFAuthProvider) session.getAuthProvider(); + DLFAuthProvider authProvider = generateDLFAuthProvider(fileName, "serverUrl"); String theFirstFetchToken = - OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token); + OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken()); assertEquals(theFirstFetchToken, theFirstGenerateToken); tokenFile.delete(); tokenFile2Token = generateTokenAndWriteToFile(fileName); String theSecondGenerateToken = tokenFile2Token.getRight(); - Thread.sleep(tokenRefreshInMills * 2); String theSecondFetchToken = - OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token); + OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken()); // if the second fetch token is not equal to the first fetch token, it means refresh success // as refresh maybe fail in test environment, so we need to check whether refresh success if (!theSecondFetchToken.equals(theFirstFetchToken)) { @@ -115,62 +99,23 @@ public class AuthSessionTest { } @Test - public void testRefreshAuthProviderIsSoonExpire() throws IOException, InterruptedException { + public void testRefreshAuthProviderIsSoonExpire() throws IOException { String fileName = UUID.randomUUID().toString(); Pair<File, String> tokenFile2Token = generateTokenAndWriteToFile(fileName); String token = tokenFile2Token.getRight(); File tokenFile = tokenFile2Token.getLeft(); - long tokenRefreshInMills = 5000L; - AuthProvider authProvider = - generateDLFAuthProvider(Optional.of(tokenRefreshInMills), fileName, "serverUrl"); - AuthSession session = AuthSession.fromRefreshAuthProvider(null, authProvider); - DLFAuthProvider dlfAuthProvider = (DLFAuthProvider) session.getAuthProvider(); - String authToken = OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token); + DLFAuthProvider authProvider = generateDLFAuthProvider(fileName, "serverUrl"); + String authToken = OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken()); assertEquals(token, authToken); - Thread.sleep((long) (tokenRefreshInMills * (1 - DLFAuthProvider.EXPIRED_FACTOR)) + 10L); tokenFile.delete(); tokenFile2Token = generateTokenAndWriteToFile(fileName); token = tokenFile2Token.getRight(); tokenFile = tokenFile2Token.getLeft(); FileUtils.writeStringToFile(tokenFile, token); - dlfAuthProvider = (DLFAuthProvider) session.getAuthProvider(); - authToken = OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token); + authToken = OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken()); assertEquals(token, authToken); } - @Test - public void testRetryWhenRefreshFail() throws Exception { - AuthProvider authProvider = Mockito.mock(DLFAuthProvider.class); - long expiresAtMillis = System.currentTimeMillis() - 1000L; - when(authProvider.expiresAtMillis()).thenReturn(Optional.of(expiresAtMillis)); - when(authProvider.tokenRefreshInMills()).thenReturn(Optional.of(50L)); - when(authProvider.keepRefreshed()).thenReturn(true); - when(authProvider.refresh()).thenReturn(false); - AuthSession session = AuthSession.fromRefreshAuthProvider(null, authProvider); - AuthSession.scheduleTokenRefresh( - ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token"), - session, - expiresAtMillis); - Thread.sleep(10_000L); - verify(authProvider, Mockito.times(REFRESH_NUM_RETRIES + 1)).refresh(); - } - - @Test - public void testGetTimeToWaitByExpiresInMills() { - long expiresInMillis = -100L; - long timeToWait = AuthSession.getTimeToWaitByExpiresInMills(expiresInMillis); - assertEquals(MIN_REFRESH_WAIT_MILLIS, timeToWait); - expiresInMillis = (long) (MAX_REFRESH_WINDOW_MILLIS * 0.5); - timeToWait = AuthSession.getTimeToWaitByExpiresInMills(expiresInMillis); - assertEquals(MIN_REFRESH_WAIT_MILLIS, timeToWait); - expiresInMillis = MAX_REFRESH_WINDOW_MILLIS; - timeToWait = AuthSession.getTimeToWaitByExpiresInMills(expiresInMillis); - assertEquals(timeToWait, MIN_REFRESH_WAIT_MILLIS); - expiresInMillis = MAX_REFRESH_WINDOW_MILLIS * 2L; - timeToWait = AuthSession.getTimeToWaitByExpiresInMills(expiresInMillis); - assertEquals(timeToWait, MAX_REFRESH_WINDOW_MILLIS); - } - @Test public void testCreateDLFAuthProviderByStsToken() throws IOException { Options options = new Options(); @@ -182,11 +127,10 @@ public class AuthSessionTest { options.set(DLF_ACCESS_KEY_SECRET.key(), token.getAccessKeySecret()); options.set(DLF_SECURITY_TOKEN.key(), token.getSecurityToken()); options.set(DLF_REGION.key(), "cn-hangzhou"); - AuthProvider authProvider = - AuthProviderFactory.createAuthProvider(AuthProviderEnum.DLF.identifier(), options); - AuthSession session = AuthSession.fromRefreshAuthProvider(null, authProvider); - DLFAuthProvider dlfAuthProvider = (DLFAuthProvider) session.getAuthProvider(); - String authToken = OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token); + options.set(TOKEN_PROVIDER, "dlf"); + DLFAuthProvider authProvider = + (DLFAuthProvider) AuthProviderFactory.createAuthProvider(options); + String authToken = OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken()); assertEquals(OBJECT_MAPPER_INSTANCE.writeValueAsString(token), authToken); } @@ -199,11 +143,10 @@ public class AuthSessionTest { options.set(DLF_ACCESS_KEY_ID.key(), token.getAccessKeyId()); options.set(DLF_ACCESS_KEY_SECRET.key(), token.getAccessKeySecret()); options.set(DLF_REGION.key(), "cn-hangzhou"); - AuthProvider authProvider = - AuthProviderFactory.createAuthProvider(AuthProviderEnum.DLF.identifier(), options); - AuthSession session = AuthSession.fromRefreshAuthProvider(null, authProvider); - DLFAuthProvider dlfAuthProvider = (DLFAuthProvider) session.getAuthProvider(); - String authToken = OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token); + options.set(TOKEN_PROVIDER, "dlf"); + DLFAuthProvider authProvider = + (DLFAuthProvider) AuthProviderFactory.createAuthProvider(options); + String authToken = OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken()); assertEquals(OBJECT_MAPPER_INSTANCE.writeValueAsString(token), authToken); } @@ -212,55 +155,43 @@ public class AuthSessionTest { String fileName = UUID.randomUUID().toString(); Pair<File, String> tokenFile2Token = generateTokenAndWriteToFile(fileName); String token = tokenFile2Token.getRight(); - AuthProvider authProvider = - generateDLFAuthProvider(Optional.empty(), fileName, "serverUrl"); - ScheduledExecutorService executor = - ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token"); - AuthSession session = AuthSession.fromRefreshAuthProvider(executor, authProvider); - DLFAuthProvider dlfAuthProvider = (DLFAuthProvider) session.getAuthProvider(); - String authToken = OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token); + DLFAuthProvider authProvider = generateDLFAuthProvider(fileName, "serverUrl"); + String authToken = OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken()); assertEquals(authToken, token); } @Test public void testCreateDLFAuthProviderWithoutNeedConf() { + Options options = new Options(); + options.set(TOKEN_PROVIDER, "dlf"); assertThrows( IllegalArgumentException.class, - () -> - AuthProviderFactory.createAuthProvider( - AuthProviderEnum.DLF.identifier(), new Options())); + () -> AuthProviderFactory.createAuthProvider(new Options())); } @Test - public void testCreateDlfAuthProviderByDLFTokenLoader() - throws IOException, InterruptedException { + public void testCreateDlfAuthProviderByDLFTokenLoader() throws IOException { String fileName = UUID.randomUUID().toString(); Pair<File, String> tokenFile2Token = generateTokenAndWriteToFile(fileName); String theFirstGenerateToken = tokenFile2Token.getRight(); File tokenFile = tokenFile2Token.getLeft(); - long tokenRefreshInMills = 1000; // create options with token loader Options options = new Options(); options.set(DLF_TOKEN_LOADER.key(), "local_file"); options.set(DLF_TOKEN_PATH.key(), folder.getRoot().getPath() + "/" + fileName); options.set(RESTCatalogOptions.URI.key(), "serverUrl"); options.set(DLF_REGION.key(), "cn-hangzhou"); - options.set(TOKEN_REFRESH_TIME.key(), tokenRefreshInMills + "ms"); - AuthProvider authProvider = - AuthProviderFactory.createAuthProvider(AuthProviderEnum.DLF.identifier(), options); - ScheduledExecutorService executor = - ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token"); - AuthSession session = AuthSession.fromRefreshAuthProvider(executor, authProvider); - DLFAuthProvider dlfAuthProvider = (DLFAuthProvider) session.getAuthProvider(); + options.set(TOKEN_PROVIDER, "dlf"); + DLFAuthProvider authProvider = + (DLFAuthProvider) AuthProviderFactory.createAuthProvider(options); String theFirstFetchToken = - OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token); + OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken()); assertEquals(theFirstFetchToken, theFirstGenerateToken); tokenFile.delete(); tokenFile2Token = generateTokenAndWriteToFile(fileName); String theSecondGenerateToken = tokenFile2Token.getRight(); - Thread.sleep(tokenRefreshInMills * 2); String theSecondFetchToken = - OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token); + OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken()); // if the second fetch token is not equal to the first fetch token, it means refresh success // as refresh maybe fail in test environment, so we need to check whether refresh success if (!theSecondFetchToken.equals(theFirstFetchToken)) { @@ -269,8 +200,7 @@ public class AuthSessionTest { } @Test - public void testCreateDlfAuthProviderByCustomDLFTokenLoader() - throws IOException, InterruptedException { + public void testCreateDlfAuthProviderByCustomDLFTokenLoader() { DLFToken customToken = generateToken(); // create options with custom token loader Options options = new Options(); @@ -280,21 +210,17 @@ public class AuthSessionTest { options.set(DLF_SECURITY_TOKEN.key(), customToken.getSecurityToken()); options.set(RESTCatalogOptions.URI.key(), "serverUrl"); options.set(DLF_REGION.key(), "cn-hangzhou"); - AuthProvider authProvider = - AuthProviderFactory.createAuthProvider(AuthProviderEnum.DLF.identifier(), options); - ScheduledExecutorService executor = - ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token"); - AuthSession session = AuthSession.fromRefreshAuthProvider(executor, authProvider); - DLFAuthProvider dlfAuthProvider = (DLFAuthProvider) session.getAuthProvider(); - DLFToken fetchToken = dlfAuthProvider.token; + options.set(TOKEN_PROVIDER, "dlf"); + DLFAuthProvider authProvider = + (DLFAuthProvider) AuthProviderFactory.createAuthProvider(options); + DLFToken fetchToken = authProvider.getFreshToken(); assertEquals(fetchToken.getAccessKeyId(), customToken.getAccessKeyId()); assertEquals(fetchToken.getAccessKeySecret(), customToken.getAccessKeySecret()); assertEquals(fetchToken.getSecurityToken(), customToken.getSecurityToken()); } @Test - public void testCreateDlfAuthProviderByECSTokenProvider() - throws IOException, InterruptedException { + public void testCreateDlfAuthProviderByECSTokenProvider() throws IOException { MockECSMetadataService mockECSMetadataService = new MockECSMetadataService("EcsTestRole"); mockECSMetadataService.start(); try { @@ -302,7 +228,6 @@ public class AuthSessionTest { mockECSMetadataService.setMockToken(theFirstMockToken); String theFirstMockTokenStr = OBJECT_MAPPER_INSTANCE.writeValueAsString(theFirstMockToken); - long tokenRefreshInMills = 1000; // create options with token loader Options options = new Options(); options.set(DLF_TOKEN_LOADER.key(), "ecs"); @@ -311,34 +236,41 @@ public class AuthSessionTest { mockECSMetadataService.getUrl() + "latest/meta-data/Ram/security-credentials/"); options.set(RESTCatalogOptions.URI.key(), "serverUrl"); options.set(DLF_REGION.key(), "cn-hangzhou"); - options.set(TOKEN_REFRESH_TIME.key(), tokenRefreshInMills + "ms"); - AuthProvider authProvider = - AuthProviderFactory.createAuthProvider( - AuthProviderEnum.DLF.identifier(), options); - ScheduledExecutorService executor = - ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token"); - AuthSession session = AuthSession.fromRefreshAuthProvider(executor, authProvider); - DLFAuthProvider dlfAuthProvider = (DLFAuthProvider) session.getAuthProvider(); + options.set(TOKEN_PROVIDER, "dlf"); + + DLFAuthProvider authProvider = + (DLFAuthProvider) AuthProviderFactory.createAuthProvider(options); + + // first token String theFirstFetchTokenStr = - OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token); + OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken()); assertEquals(theFirstFetchTokenStr, theFirstMockTokenStr); - DLFToken theSecondMockToken = generateToken(); + // second token + DLFToken theSecondMockToken = + generateToken( + ZonedDateTime.now(ZoneOffset.UTC) + .plusSeconds(TOKEN_EXPIRATION_SAFE_TIME_MILLIS * 2 / 1000)); String theSecondMockTokenStr = OBJECT_MAPPER_INSTANCE.writeValueAsString(theSecondMockToken); mockECSMetadataService.setMockToken(theSecondMockToken); - Thread.sleep(tokenRefreshInMills * 2); String theSecondFetchTokenStr = - OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token); + OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken()); assertEquals(theSecondFetchTokenStr, theSecondMockTokenStr); + + // third token, should not refresh + DLFToken theThirdMockToken = generateToken(); + mockECSMetadataService.setMockToken(theThirdMockToken); + String theThirdFetchTokenStr = + OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken()); + assertEquals(theThirdFetchTokenStr, theSecondMockTokenStr); } finally { mockECSMetadataService.shutdown(); } } @Test - public void testCreateDlfAuthProviderByECSTokenProviderWithDefineRole() - throws IOException, InterruptedException { + public void testCreateDlfAuthProviderByECSTokenProviderWithDefineRole() throws IOException { MockECSMetadataService mockECSMetadataService = new MockECSMetadataService("CustomRole"); mockECSMetadataService.start(); try { @@ -346,7 +278,6 @@ public class AuthSessionTest { mockECSMetadataService.setMockToken(theFirstMockToken); String theFirstMockTokenStr = OBJECT_MAPPER_INSTANCE.writeValueAsString(theFirstMockToken); - long tokenRefreshInMills = 1000; // create options with token loader Options options = new Options(); options.set(DLF_TOKEN_LOADER.key(), "ecs"); @@ -356,25 +287,19 @@ public class AuthSessionTest { options.set(DLF_TOKEN_ECS_ROLE_NAME.key(), "CustomRole"); options.set(RESTCatalogOptions.URI.key(), "serverUrl"); options.set(DLF_REGION.key(), "cn-hangzhou"); - options.set(TOKEN_REFRESH_TIME.key(), tokenRefreshInMills + "ms"); - AuthProvider authProvider = - AuthProviderFactory.createAuthProvider( - AuthProviderEnum.DLF.identifier(), options); - ScheduledExecutorService executor = - ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token"); - AuthSession session = AuthSession.fromRefreshAuthProvider(executor, authProvider); - DLFAuthProvider dlfAuthProvider = (DLFAuthProvider) session.getAuthProvider(); + options.set(TOKEN_PROVIDER, "dlf"); + DLFAuthProvider authProvider = + (DLFAuthProvider) AuthProviderFactory.createAuthProvider(options); String theFirstFetchTokenStr = - OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token); + OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken()); assertEquals(theFirstFetchTokenStr, theFirstMockTokenStr); DLFToken theSecondMockToken = generateToken(); String theSecondMockTokenStr = OBJECT_MAPPER_INSTANCE.writeValueAsString(theSecondMockToken); mockECSMetadataService.setMockToken(theSecondMockToken); - Thread.sleep(tokenRefreshInMills * 2); String theSecondFetchTokenStr = - OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token); + OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken()); assertEquals(theSecondFetchTokenStr, theSecondMockTokenStr); } finally { mockECSMetadataService.shutdown(); @@ -382,8 +307,7 @@ public class AuthSessionTest { } @Test - public void testCreateDlfAuthProviderByECSTokenProviderWithInvalidRole() - throws IOException, InterruptedException { + public void testCreateDlfAuthProviderByECSTokenProviderWithInvalidRole() throws IOException { MockECSMetadataService mockECSMetadataService = new MockECSMetadataService("EcsTestRole"); mockECSMetadataService.start(); try { @@ -398,13 +322,10 @@ public class AuthSessionTest { options.set(DLF_TOKEN_ECS_ROLE_NAME.key(), "CustomRole"); options.set(RESTCatalogOptions.URI.key(), "serverUrl"); options.set(DLF_REGION.key(), "cn-hangzhou"); - assertThrows( - RuntimeException.class, - () -> { - AuthProvider authProvider = - AuthProviderFactory.createAuthProvider( - AuthProviderEnum.DLF.identifier(), options); - }); + options.set(TOKEN_PROVIDER, "dlf"); + DLFAuthProvider authProvider = + (DLFAuthProvider) AuthProviderFactory.createAuthProvider(options); + assertThrows(RuntimeException.class, authProvider::getFreshToken); } finally { mockECSMetadataService.shutdown(); } @@ -416,7 +337,7 @@ public class AuthSessionTest { Pair<File, String> tokenFile2Token = generateTokenAndWriteToFile(fileName); String tokenStr = tokenFile2Token.getRight(); String serverUrl = "https://dlf-cn-hangzhou.aliyuncs.com"; - AuthProvider authProvider = generateDLFAuthProvider(Optional.empty(), fileName, serverUrl); + AuthProvider authProvider = generateDLFAuthProvider(fileName, serverUrl); DLFToken token = OBJECT_MAPPER_INSTANCE.readValue(tokenStr, DLFToken.class); Map<String, String> parameters = new HashMap<>(); parameters.put("k1", "v1"); @@ -424,7 +345,8 @@ public class AuthSessionTest { String data = "data"; RESTAuthParameter restAuthParameter = new RESTAuthParameter("/path", parameters, "method", "data"); - Map<String, String> header = authProvider.header(new HashMap<>(), restAuthParameter); + Map<String, String> header = + authProvider.mergeAuthHeader(new HashMap<>(), restAuthParameter); String authorization = header.get(DLF_AUTHORIZATION_HEADER_KEY); String[] credentials = authorization.split(",")[0].split(" ")[1].split("/"); String dateTime = header.get(DLF_DATE_HEADER_KEY); @@ -456,8 +378,7 @@ public class AuthSessionTest { private Pair<File, String> generateTokenAndWriteToFile(String fileName) throws IOException { File tokenFile = folder.newFile(fileName); - ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); - String expiration = now.format(TOKEN_DATE_FORMATTER); + String expiration = ZonedDateTime.now(ZoneOffset.UTC).format(TOKEN_DATE_FORMATTER); String secret = UUID.randomUUID().toString(); DLFToken token = new DLFToken("accessKeyId", secret, "securityToken", expiration); String tokenStr = OBJECT_MAPPER_INSTANCE.writeValueAsString(token); @@ -466,23 +387,23 @@ public class AuthSessionTest { } private DLFToken generateToken() { + return generateToken(ZonedDateTime.now(ZoneOffset.UTC)); + } + + private DLFToken generateToken(ZonedDateTime expireTime) { String accessKeyId = UUID.randomUUID().toString(); String accessKeySecret = UUID.randomUUID().toString(); String securityToken = UUID.randomUUID().toString(); - ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); - String expiration = now.format(TOKEN_DATE_FORMATTER); + String expiration = expireTime.format(TOKEN_DATE_FORMATTER); return new DLFToken(accessKeyId, accessKeySecret, securityToken, expiration); } - private AuthProvider generateDLFAuthProvider( - Optional<Long> tokenRefreshInMillsOpt, String fileName, String serverUrl) { + private DLFAuthProvider generateDLFAuthProvider(String fileName, String serverUrl) { Options options = new Options(); options.set(DLF_TOKEN_PATH.key(), folder.getRoot().getPath() + "/" + fileName); options.set(RESTCatalogOptions.URI.key(), serverUrl); options.set(DLF_REGION.key(), "cn-hangzhou"); - tokenRefreshInMillsOpt.ifPresent( - tokenRefreshInMills -> - options.set(TOKEN_REFRESH_TIME.key(), tokenRefreshInMills + "ms")); - return AuthProviderFactory.createAuthProvider(AuthProviderEnum.DLF.identifier(), options); + options.set(TOKEN_PROVIDER, "dlf"); + return (DLFAuthProvider) AuthProviderFactory.createAuthProvider(options); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/auth/CustomTestDLFTokenLoader.java b/paimon-core/src/test/java/org/apache/paimon/rest/auth/CustomTestDLFTokenLoader.java index 279c70080b..139df16dea 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/auth/CustomTestDLFTokenLoader.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/auth/CustomTestDLFTokenLoader.java @@ -21,7 +21,7 @@ package org.apache.paimon.rest.auth; import java.time.ZoneOffset; import java.time.ZonedDateTime; -import static org.apache.paimon.rest.auth.DLFAuthProvider.TOKEN_DATE_FORMATTER; +import static org.apache.paimon.rest.auth.DLFToken.TOKEN_DATE_FORMATTER; /** DLF Token Loader for custom. */ public class CustomTestDLFTokenLoader implements DLFTokenLoader { @@ -39,4 +39,9 @@ public class CustomTestDLFTokenLoader implements DLFTokenLoader { public DLFToken loadToken() { return token; } + + @Override + public String description() { + return "custom"; + } }
