This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 704291824e [core] Support auth in REST Catalog (#4648)
704291824e is described below

commit 704291824ed39c1d98d57c959ccbf333d05f6c2c
Author: lining <[email protected]>
AuthorDate: Tue Dec 10 10:07:30 2024 +0800

    [core] Support auth in REST Catalog (#4648)
---
 .../org/apache/paimon/utils/ThreadPoolUtils.java   |  10 ++
 .../java/org/apache/paimon/rest/RESTCatalog.java   |  51 ++++++--
 .../paimon/rest/RESTCatalogInternalOptions.java    |   5 +
 .../org/apache/paimon/rest/RESTCatalogOptions.java |  24 +++-
 .../org/apache/paimon/rest/auth/AuthSession.java   | 137 +++++++++++++++++++++
 .../BaseBearTokenCredentialsProvider.java}         |  26 ++--
 .../BearTokenCredentialsProvider.java}             |  28 +++--
 .../auth/BearTokenCredentialsProviderFactory.java  |  43 +++++++
 .../auth/BearTokenFileCredentialsProvider.java     | 106 ++++++++++++++++
 .../BearTokenFileCredentialsProviderFactory.java   |  48 ++++++++
 .../CredentialsProvider.java}                      |  43 +++++--
 .../rest/auth/CredentialsProviderFactory.java      |  54 ++++++++
 .../CredentialsProviderType.java}                  |  16 +--
 .../services/org.apache.paimon.factories.Factory   |   2 +
 .../org/apache/paimon/rest/HttpClientTest.java     |   9 +-
 .../apache/paimon/rest/auth/AuthSessionTest.java   | 130 +++++++++++++++++++
 .../rest/auth/CredentialsProviderFactoryTest.java  | 131 ++++++++++++++++++++
 17 files changed, 800 insertions(+), 63 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
index f8959def67..c64b9e26ea 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
@@ -20,6 +20,7 @@ package org.apache.paimon.utils;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+import 
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import javax.annotation.Nullable;
 
@@ -36,6 +37,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -76,6 +79,13 @@ public class ThreadPoolUtils {
         return executor;
     }
 
+    public static ScheduledExecutorService createScheduledThreadPool(
+            int threadNum, String namePrefix) {
+        return new ScheduledThreadPoolExecutor(
+                threadNum,
+                new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(namePrefix).build());
+    }
+
     /** This method aims to parallel process tasks with memory control and 
sequentially. */
     public static <T, U> Iterable<T> sequentialBatchedExecute(
             ThreadPoolExecutor executor,
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index c964008313..e18946b337 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.rest;
 
-import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Database;
 import org.apache.paimon.catalog.Identifier;
@@ -27,37 +26,42 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.auth.AuthSession;
+import org.apache.paimon.rest.auth.CredentialsProvider;
+import org.apache.paimon.rest.auth.CredentialsProviderFactory;
 import org.apache.paimon.rest.responses.ConfigResponse;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.Table;
 
-import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+import 
org.apache.paimon.shade.guava30.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
 import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static 
org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool;
 
 /** A catalog implementation for REST. */
 public class RESTCatalog implements Catalog {
     private RESTClient client;
-    private String token;
     private ResourcePaths resourcePaths;
     private Map<String, String> options;
     private Map<String, String> baseHeader;
+    // a lazy thread pool for token refresh
+    private final AuthSession catalogAuth;
+    private volatile ScheduledExecutorService refreshExecutor = null;
 
     private static final ObjectMapper objectMapper = RESTObjectMapper.create();
-    static final String AUTH_HEADER = "Authorization";
-    static final String AUTH_HEADER_VALUE_FORMAT = "Bearer %s";
 
     public RESTCatalog(Options options) {
         if (options.getOptional(CatalogOptions.WAREHOUSE).isPresent()) {
             throw new IllegalArgumentException("Can not config warehouse in 
RESTCatalog.");
         }
         String uri = options.get(RESTCatalogOptions.URI);
-        token = options.get(RESTCatalogOptions.TOKEN);
         Optional<Duration> connectTimeout =
                 options.getOptional(RESTCatalogOptions.CONNECTION_TIMEOUT);
         Optional<Duration> readTimeout = 
options.getOptional(RESTCatalogOptions.READ_TIMEOUT);
@@ -71,12 +75,21 @@ public class RESTCatalog implements Catalog {
                         threadPoolSize,
                         DefaultErrorHandler.getInstance());
         this.client = new HttpClient(httpClientOptions);
-        Map<String, String> authHeaders =
-                ImmutableMap.of(AUTH_HEADER, 
String.format(AUTH_HEADER_VALUE_FORMAT, token));
+        this.baseHeader = configHeaders(options.toMap());
+        CredentialsProvider credentialsProvider =
+                CredentialsProviderFactory.createCredentialsProvider(
+                        options, RESTCatalog.class.getClassLoader());
+        if (credentialsProvider.keepRefreshed()) {
+            this.catalogAuth =
+                    AuthSession.fromRefreshCredentialsProvider(
+                            tokenRefreshExecutor(), this.baseHeader, 
credentialsProvider);
+
+        } else {
+            this.catalogAuth = new AuthSession(this.baseHeader, 
credentialsProvider);
+        }
         Map<String, String> initHeaders =
-                RESTUtil.merge(configHeaders(options.toMap()), authHeaders);
+                RESTUtil.merge(configHeaders(options.toMap()), 
this.catalogAuth.getHeaders());
         this.options = fetchOptionsFromServer(initHeaders, options.toMap());
-        this.baseHeader = configHeaders(this.options());
         this.resourcePaths =
                 ResourcePaths.forCatalogProperties(
                         this.options.get(RESTCatalogInternalOptions.PREFIX));
@@ -187,11 +200,27 @@ public class RESTCatalog implements Catalog {
     Map<String, String> fetchOptionsFromServer(
             Map<String, String> headers, Map<String, String> clientProperties) 
{
         ConfigResponse response =
-                client.get(ResourcePaths.V1_CONFIG, ConfigResponse.class, 
headers);
+                client.get(ResourcePaths.V1_CONFIG, ConfigResponse.class, 
headers());
         return response.merge(clientProperties);
     }
 
     private static Map<String, String> configHeaders(Map<String, String> 
properties) {
         return RESTUtil.extractPrefixMap(properties, "header.");
     }
+
+    private Map<String, String> headers() {
+        return catalogAuth.getHeaders();
+    }
+
+    private ScheduledExecutorService tokenRefreshExecutor() {
+        if (refreshExecutor == null) {
+            synchronized (this) {
+                if (refreshExecutor == null) {
+                    this.refreshExecutor = createScheduledThreadPool(1, 
"token-refresh-thread");
+                }
+            }
+        }
+
+        return refreshExecutor;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogInternalOptions.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogInternalOptions.java
index cf61caa20e..62a8bf134a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogInternalOptions.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogInternalOptions.java
@@ -28,4 +28,9 @@ public class RESTCatalogInternalOptions {
                     .stringType()
                     .noDefaultValue()
                     .withDescription("REST Catalog uri's prefix.");
+    public static final ConfigOption<String> CREDENTIALS_PROVIDER =
+            ConfigOptions.key("credentials-provider")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("REST Catalog auth credentials 
provider.");
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
index 6155b89375..8f7bea91dc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
@@ -30,11 +30,6 @@ public class RESTCatalogOptions {
                     .stringType()
                     .noDefaultValue()
                     .withDescription("REST Catalog server's uri.");
-    public static final ConfigOption<String> TOKEN =
-            ConfigOptions.key("token")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("REST Catalog server's auth token.");
     public static final ConfigOption<Duration> CONNECTION_TIMEOUT =
             ConfigOptions.key("rest.client.connection-timeout")
                     .durationType()
@@ -50,4 +45,23 @@ public class RESTCatalogOptions {
                     .intType()
                     .defaultValue(1)
                     .withDescription("REST Catalog http client thread num.");
+    public static final ConfigOption<String> TOKEN =
+            ConfigOptions.key("token")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("REST Catalog auth token.");
+    public static final ConfigOption<Duration> TOKEN_EXPIRATION_TIME =
+            ConfigOptions.key("token.expiration-time")
+                    .durationType()
+                    .defaultValue(Duration.ofHours(1))
+                    .withDescription(
+                            "REST Catalog auth token expires time.The token 
generates system refresh frequency is t1,"
+                                    + " the token expires time is t2, we need 
to guarantee that t2 > t1,"
+                                    + " the token validity time is [t2 - t1, 
t2],"
+                                    + " and the expires time defined here 
needs to be less than (t2 - t1)");
+    public static final ConfigOption<String> TOKEN_PROVIDER_PATH =
+            ConfigOptions.key("token.provider.path")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("REST Catalog auth token provider path.");
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java
new file mode 100644
index 0000000000..74efb8508a
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.auth;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.rest.RESTUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/** Auth session. */
+public class AuthSession {
+
+    static final int TOKEN_REFRESH_NUM_RETRIES = 5;
+    private static final Logger log = 
LoggerFactory.getLogger(AuthSession.class);
+    private static final long MAX_REFRESH_WINDOW_MILLIS = 300_000; // 5 minutes
+    private static final long MIN_REFRESH_WAIT_MILLIS = 10;
+    private final CredentialsProvider credentialsProvider;
+    private volatile Map<String, String> headers;
+
+    public AuthSession(Map<String, String> headers, CredentialsProvider 
credentialsProvider) {
+        this.headers = headers;
+        this.credentialsProvider = credentialsProvider;
+    }
+
+    public static AuthSession fromRefreshCredentialsProvider(
+            ScheduledExecutorService executor,
+            Map<String, String> headers,
+            CredentialsProvider credentialsProvider) {
+        AuthSession session = new AuthSession(headers, credentialsProvider);
+
+        long startTimeMillis = System.currentTimeMillis();
+        Optional<Long> expiresAtMillisOpt = 
credentialsProvider.expiresAtMillis();
+
+        // when init session if credentials expire time is in the past, 
refresh it and update
+        // expiresAtMillis
+        if (expiresAtMillisOpt.isPresent() && expiresAtMillisOpt.get() <= 
startTimeMillis) {
+            boolean refreshSuccessful = session.refresh();
+            if (refreshSuccessful) {
+                expiresAtMillisOpt = 
session.credentialsProvider.expiresAtMillis();
+            }
+        }
+
+        if (null != executor && expiresAtMillisOpt.isPresent()) {
+            scheduleTokenRefresh(executor, session, expiresAtMillisOpt.get());
+        }
+
+        return session;
+    }
+
+    public Map<String, String> getHeaders() {
+        if (this.credentialsProvider.keepRefreshed() && 
this.credentialsProvider.willSoonExpire()) {
+            refresh();
+        }
+        return headers;
+    }
+
+    @VisibleForTesting
+    static void scheduleTokenRefresh(
+            ScheduledExecutorService executor, AuthSession session, long 
expiresAtMillis) {
+        scheduleTokenRefresh(executor, session, expiresAtMillis, 0);
+    }
+
+    private static void scheduleTokenRefresh(
+            ScheduledExecutorService executor,
+            AuthSession session,
+            long expiresAtMillis,
+            int retryTimes) {
+        if (retryTimes < TOKEN_REFRESH_NUM_RETRIES) {
+            long expiresInMillis = expiresAtMillis - 
System.currentTimeMillis();
+            // how much ahead of time to start the refresh to allow it to 
complete
+            long refreshWindowMillis = Math.min(expiresInMillis, 
MAX_REFRESH_WINDOW_MILLIS);
+            // how much time to wait before expiration
+            long waitIntervalMillis = expiresInMillis - refreshWindowMillis;
+            // how much time to actually wait
+            long timeToWait = Math.max(waitIntervalMillis, 
MIN_REFRESH_WAIT_MILLIS);
+
+            executor.schedule(
+                    () -> {
+                        long refreshStartTime = System.currentTimeMillis();
+                        boolean isSuccessful = session.refresh();
+                        if (isSuccessful) {
+                            scheduleTokenRefresh(
+                                    executor,
+                                    session,
+                                    refreshStartTime
+                                            + 
session.credentialsProvider.expiresInMills().get(),
+                                    0);
+                        } else {
+                            scheduleTokenRefresh(
+                                    executor, session, expiresAtMillis, 
retryTimes + 1);
+                        }
+                    },
+                    timeToWait,
+                    TimeUnit.MILLISECONDS);
+        } else {
+            log.warn("Failed to refresh token after {} retries.", 
TOKEN_REFRESH_NUM_RETRIES);
+        }
+    }
+
+    public Boolean refresh() {
+        if (this.credentialsProvider.supportRefresh()
+                && this.credentialsProvider.keepRefreshed()
+                && this.credentialsProvider.expiresInMills().isPresent()) {
+            boolean isSuccessful = this.credentialsProvider.refresh();
+            if (isSuccessful) {
+                Map<String, String> currentHeaders = this.headers;
+                this.headers =
+                        RESTUtil.merge(currentHeaders, 
this.credentialsProvider.authHeader());
+            }
+            return isSuccessful;
+        }
+
+        return false;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogInternalOptions.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BaseBearTokenCredentialsProvider.java
similarity index 58%
copy from 
paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogInternalOptions.java
copy to 
paimon-core/src/main/java/org/apache/paimon/rest/auth/BaseBearTokenCredentialsProvider.java
index cf61caa20e..d3df878261 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogInternalOptions.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BaseBearTokenCredentialsProvider.java
@@ -16,16 +16,22 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.rest;
+package org.apache.paimon.rest.auth;
 
-import org.apache.paimon.options.ConfigOption;
-import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
 
-/** Internal options for REST Catalog. */
-public class RESTCatalogInternalOptions {
-    public static final ConfigOption<String> PREFIX =
-            ConfigOptions.key("prefix")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("REST Catalog uri's prefix.");
+import java.util.Map;
+
+/** Base bear token credentials provider. */
+public abstract class BaseBearTokenCredentialsProvider implements 
CredentialsProvider {
+
+    private static final String AUTHORIZATION_HEADER = "Authorization";
+    private static final String BEARER_PREFIX = "Bearer ";
+
+    @Override
+    public Map<String, String> authHeader() {
+        return ImmutableMap.of(AUTHORIZATION_HEADER, BEARER_PREFIX + token());
+    }
+
+    abstract String token();
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogInternalOptions.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenCredentialsProvider.java
similarity index 64%
copy from 
paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogInternalOptions.java
copy to 
paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenCredentialsProvider.java
index cf61caa20e..89228fe10b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogInternalOptions.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenCredentialsProvider.java
@@ -16,16 +16,24 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.rest;
+package org.apache.paimon.rest.auth;
 
-import org.apache.paimon.options.ConfigOption;
-import org.apache.paimon.options.ConfigOptions;
+/** credentials provider for bear token. */
+public class BearTokenCredentialsProvider extends 
BaseBearTokenCredentialsProvider {
 
-/** Internal options for REST Catalog. */
-public class RESTCatalogInternalOptions {
-    public static final ConfigOption<String> PREFIX =
-            ConfigOptions.key("prefix")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("REST Catalog uri's prefix.");
+    private final String token;
+
+    public BearTokenCredentialsProvider(String token) {
+        this.token = token;
+    }
+
+    @Override
+    String token() {
+        return this.token;
+    }
+
+    @Override
+    public boolean refresh() {
+        return true;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenCredentialsProviderFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenCredentialsProviderFactory.java
new file mode 100644
index 0000000000..e63ac5606b
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenCredentialsProviderFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.auth;
+
+import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.RESTCatalogOptions;
+import org.apache.paimon.utils.StringUtils;
+
+/** factory for create {@link BearTokenCredentialsProvider}. */
+public class BearTokenCredentialsProviderFactory implements 
CredentialsProviderFactory {
+
+    @Override
+    public String identifier() {
+        return CredentialsProviderType.BEAR_TOKEN.name();
+    }
+
+    @Override
+    public CredentialsProvider create(Options options) {
+        if (options.getOptional(RESTCatalogOptions.TOKEN)
+                .map(StringUtils::isNullOrWhitespaceOnly)
+                .orElse(true)) {
+            throw new IllegalArgumentException(
+                    RESTCatalogOptions.TOKEN.key() + " is required and not 
empty");
+        }
+        return new 
BearTokenCredentialsProvider(options.get(RESTCatalogOptions.TOKEN));
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenFileCredentialsProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenFileCredentialsProvider.java
new file mode 100644
index 0000000000..d479caa67f
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenFileCredentialsProvider.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.auth;
+
+import org.apache.paimon.utils.FileIOUtils;
+import org.apache.paimon.utils.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Optional;
+
+/** credentials provider for get bear token from file. */
+public class BearTokenFileCredentialsProvider extends 
BaseBearTokenCredentialsProvider {
+
+    public static final double EXPIRED_FACTOR = 0.4;
+
+    private final String tokenFilePath;
+    private String token;
+    private boolean keepRefreshed = false;
+    private Long expiresAtMillis = null;
+    private Long expiresInMills = null;
+
+    public BearTokenFileCredentialsProvider(String tokenFilePath) {
+        this.tokenFilePath = tokenFilePath;
+        this.token = getTokenFromFile();
+    }
+
+    public BearTokenFileCredentialsProvider(String tokenFilePath, Long 
expiresInMills) {
+        this(tokenFilePath);
+        this.keepRefreshed = true;
+        this.expiresAtMillis = -1L;
+        this.expiresInMills = expiresInMills;
+    }
+
+    @Override
+    String token() {
+        return this.token;
+    }
+
+    @Override
+    public boolean refresh() {
+        long start = System.currentTimeMillis();
+        String newToken = getTokenFromFile();
+        if (StringUtils.isNullOrWhitespaceOnly(newToken)) {
+            return false;
+        }
+        this.expiresAtMillis = start + this.expiresInMills;
+        this.token = newToken;
+        return true;
+    }
+
+    @Override
+    public boolean supportRefresh() {
+        return true;
+    }
+
+    @Override
+    public boolean keepRefreshed() {
+        return this.keepRefreshed;
+    }
+
+    @Override
+    public boolean willSoonExpire() {
+        if (keepRefreshed()) {
+            return expiresAtMillis().get() - System.currentTimeMillis()
+                    < expiresInMills().get() * EXPIRED_FACTOR;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public Optional<Long> expiresAtMillis() {
+        return Optional.ofNullable(this.expiresAtMillis);
+    }
+
+    @Override
+    public Optional<Long> expiresInMills() {
+        return Optional.ofNullable(this.expiresInMills);
+    }
+
+    private String getTokenFromFile() {
+        try {
+            return FileIOUtils.readFileUtf8(new File(tokenFilePath));
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenFileCredentialsProviderFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenFileCredentialsProviderFactory.java
new file mode 100644
index 0000000000..a0fa6b405d
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenFileCredentialsProviderFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.auth;
+
+import org.apache.paimon.options.Options;
+
+import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN_EXPIRATION_TIME;
+import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN_PROVIDER_PATH;
+
+/** factory for create {@link BearTokenCredentialsProvider}. */
+public class BearTokenFileCredentialsProviderFactory implements 
CredentialsProviderFactory {
+
+    @Override
+    public String identifier() {
+        return CredentialsProviderType.BEAR_TOKEN_FILE.name();
+    }
+
+    @Override
+    public CredentialsProvider create(Options options) {
+        if (!options.getOptional(TOKEN_PROVIDER_PATH).isPresent()) {
+            throw new IllegalArgumentException(TOKEN_PROVIDER_PATH.key() + " 
is required");
+        }
+        String tokenFilePath = options.get(TOKEN_PROVIDER_PATH);
+        if (options.getOptional(TOKEN_EXPIRATION_TIME).isPresent()) {
+            long tokenExpireInMills = 
options.get(TOKEN_EXPIRATION_TIME).toMillis();
+            return new BearTokenFileCredentialsProvider(tokenFilePath, 
tokenExpireInMills);
+
+        } else {
+            return new BearTokenFileCredentialsProvider(tokenFilePath);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogInternalOptions.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/CredentialsProvider.java
similarity index 57%
copy from 
paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogInternalOptions.java
copy to 
paimon-core/src/main/java/org/apache/paimon/rest/auth/CredentialsProvider.java
index cf61caa20e..7fe8008e59 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogInternalOptions.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/CredentialsProvider.java
@@ -16,16 +16,35 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.rest;
-
-import org.apache.paimon.options.ConfigOption;
-import org.apache.paimon.options.ConfigOptions;
-
-/** Internal options for REST Catalog. */
-public class RESTCatalogInternalOptions {
-    public static final ConfigOption<String> PREFIX =
-            ConfigOptions.key("prefix")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("REST Catalog uri's prefix.");
+package org.apache.paimon.rest.auth;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Credentials provider. */
+public interface CredentialsProvider {
+
+    Map<String, String> authHeader();
+
+    boolean refresh();
+
+    default boolean supportRefresh() {
+        return false;
+    }
+
+    default boolean keepRefreshed() {
+        return false;
+    }
+
+    default boolean willSoonExpire() {
+        return false;
+    }
+
+    default Optional<Long> expiresAtMillis() {
+        return Optional.empty();
+    }
+
+    default Optional<Long> expiresInMills() {
+        return Optional.empty();
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/CredentialsProviderFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/CredentialsProviderFactory.java
new file mode 100644
index 0000000000..50c3564ad8
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/CredentialsProviderFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.auth;
+
+import org.apache.paimon.factories.Factory;
+import org.apache.paimon.factories.FactoryUtil;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.RESTCatalogOptions;
+
+import static 
org.apache.paimon.rest.RESTCatalogInternalOptions.CREDENTIALS_PROVIDER;
+
+/** Factory for creating {@link CredentialsProvider}. */
+public interface CredentialsProviderFactory extends Factory {
+
+    default CredentialsProvider create(Options options) {
+        throw new UnsupportedOperationException(
+                "Use  create(context) for " + this.getClass().getSimpleName());
+    }
+
+    static CredentialsProvider createCredentialsProvider(Options options, 
ClassLoader classLoader) {
+        String credentialsProviderIdentifier = 
getCredentialsProviderTypeByConf(options).name();
+        CredentialsProviderFactory credentialsProviderFactory =
+                FactoryUtil.discoverFactory(
+                        classLoader,
+                        CredentialsProviderFactory.class,
+                        credentialsProviderIdentifier);
+        return credentialsProviderFactory.create(options);
+    }
+
+    static CredentialsProviderType getCredentialsProviderTypeByConf(Options 
options) {
+        if (options.getOptional(CREDENTIALS_PROVIDER).isPresent()) {
+            return 
CredentialsProviderType.valueOf(options.get(CREDENTIALS_PROVIDER));
+        } else if 
(options.getOptional(RESTCatalogOptions.TOKEN_PROVIDER_PATH).isPresent()) {
+            return CredentialsProviderType.BEAR_TOKEN_FILE;
+        }
+        return CredentialsProviderType.BEAR_TOKEN;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogInternalOptions.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/CredentialsProviderType.java
similarity index 64%
copy from 
paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogInternalOptions.java
copy to 
paimon-core/src/main/java/org/apache/paimon/rest/auth/CredentialsProviderType.java
index cf61caa20e..28c344d70e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogInternalOptions.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/CredentialsProviderType.java
@@ -16,16 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.rest;
+package org.apache.paimon.rest.auth;
 
-import org.apache.paimon.options.ConfigOption;
-import org.apache.paimon.options.ConfigOptions;
-
-/** Internal options for REST Catalog. */
-public class RESTCatalogInternalOptions {
-    public static final ConfigOption<String> PREFIX =
-            ConfigOptions.key("prefix")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("REST Catalog uri's prefix.");
+/** Credentials provider type. */
+public enum CredentialsProviderType {
+    BEAR_TOKEN,
+    BEAR_TOKEN_FILE
 }
diff --git 
a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 3b98eef52c..6416edd720 100644
--- 
a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -37,3 +37,5 @@ 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldRoaringBitmap64AggFac
 org.apache.paimon.mergetree.compact.aggregate.factory.FieldSumAggFactory
 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldThetaSketchAggFactory
 org.apache.paimon.rest.RESTCatalogFactory
+org.apache.paimon.rest.auth.BearTokenCredentialsProviderFactory
+org.apache.paimon.rest.auth.BearTokenFileCredentialsProviderFactory
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
index 1140e39982..17c13b932f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
@@ -18,7 +18,9 @@
 
 package org.apache.paimon.rest;
 
-import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+import org.apache.paimon.rest.auth.BearTokenCredentialsProvider;
+import org.apache.paimon.rest.auth.CredentialsProvider;
+
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
 import okhttp3.mockwebserver.MockResponse;
@@ -33,8 +35,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
-import static org.apache.paimon.rest.RESTCatalog.AUTH_HEADER;
-import static org.apache.paimon.rest.RESTCatalog.AUTH_HEADER_VALUE_FORMAT;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
@@ -70,7 +70,8 @@ public class HttpClientTest {
         mockResponseData = new MockRESTData(MOCK_PATH);
         mockResponseDataStr = 
objectMapper.writeValueAsString(mockResponseData);
         httpClient = new HttpClient(httpClientOptions);
-        headers = ImmutableMap.of(AUTH_HEADER, 
String.format(AUTH_HEADER_VALUE_FORMAT, TOKEN));
+        CredentialsProvider credentialsProvider = new 
BearTokenCredentialsProvider(TOKEN);
+        headers = credentialsProvider.authHeader();
     }
 
     @After
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java
new file mode 100644
index 0000000000..81b3ea57b7
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.auth;
+
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.ThreadPoolUtils;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static 
org.apache.paimon.rest.auth.AuthSession.TOKEN_REFRESH_NUM_RETRIES;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/** Test for {@link AuthSession}. */
+public class AuthSessionTest {
+
+    @Rule public TemporaryFolder folder = new TemporaryFolder();
+
+    @Test
+    public void testRefreshBearTokenFileCredentialsProvider()
+            throws IOException, InterruptedException {
+        String fileName = "token";
+        Pair<File, String> tokenFile2Token = 
generateTokenAndWriteToFile(fileName);
+        String token = tokenFile2Token.getRight();
+        File tokenFile = tokenFile2Token.getLeft();
+        Map<String, String> initialHeaders = new HashMap<>();
+        long expiresInMillis = 1000L;
+        CredentialsProvider credentialsProvider =
+                new BearTokenFileCredentialsProvider(tokenFile.getPath(), 
expiresInMillis);
+        ScheduledExecutorService executor =
+                ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token");
+        AuthSession session =
+                AuthSession.fromRefreshCredentialsProvider(
+                        executor, initialHeaders, credentialsProvider);
+        Map<String, String> header = session.getHeaders();
+        assertEquals(header.get("Authorization"), "Bearer " + token);
+        tokenFile.delete();
+        tokenFile2Token = generateTokenAndWriteToFile(fileName);
+        token = tokenFile2Token.getRight();
+        Thread.sleep(expiresInMillis + 500L);
+        header = session.getHeaders();
+        assertEquals(header.get("Authorization"), "Bearer " + token);
+    }
+
+    @Test
+    public void testRefreshCredentialsProviderIsSoonExpire()
+            throws IOException, InterruptedException {
+        String fileName = "token";
+        Pair<File, String> tokenFile2Token = 
generateTokenAndWriteToFile(fileName);
+        String token = tokenFile2Token.getRight();
+        File tokenFile = tokenFile2Token.getLeft();
+        Map<String, String> initialHeaders = new HashMap<>();
+        long expiresInMillis = 1000L;
+        CredentialsProvider credentialsProvider =
+                new BearTokenFileCredentialsProvider(tokenFile.getPath(), 
expiresInMillis);
+        AuthSession session =
+                AuthSession.fromRefreshCredentialsProvider(
+                        null, initialHeaders, credentialsProvider);
+        Map<String, String> header = session.getHeaders();
+        assertEquals(header.get("Authorization"), "Bearer " + token);
+        tokenFile.delete();
+        tokenFile2Token = generateTokenAndWriteToFile(fileName);
+        token = tokenFile2Token.getRight();
+        tokenFile = tokenFile2Token.getLeft();
+        FileUtils.writeStringToFile(tokenFile, token);
+        Thread.sleep(
+                (long) (expiresInMillis * (1 - 
BearTokenFileCredentialsProvider.EXPIRED_FACTOR))
+                        + 10L);
+        header = session.getHeaders();
+        assertEquals(header.get("Authorization"), "Bearer " + token);
+    }
+
+    @Test
+    public void testRetryWhenRefreshFail() throws Exception {
+        Map<String, String> initialHeaders = new HashMap<>();
+        CredentialsProvider credentialsProvider =
+                Mockito.mock(BearTokenFileCredentialsProvider.class);
+        long expiresAtMillis = System.currentTimeMillis() - 1000L;
+        
when(credentialsProvider.expiresAtMillis()).thenReturn(Optional.of(expiresAtMillis));
+        
when(credentialsProvider.expiresInMills()).thenReturn(Optional.of(50L));
+        when(credentialsProvider.supportRefresh()).thenReturn(true);
+        when(credentialsProvider.keepRefreshed()).thenReturn(true);
+        when(credentialsProvider.refresh()).thenReturn(false);
+        AuthSession session =
+                AuthSession.fromRefreshCredentialsProvider(
+                        null, initialHeaders, credentialsProvider);
+        AuthSession.scheduleTokenRefresh(
+                ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token"),
+                session,
+                expiresAtMillis);
+        Thread.sleep(10_000L);
+        verify(credentialsProvider, Mockito.times(TOKEN_REFRESH_NUM_RETRIES + 
1)).refresh();
+    }
+
+    private Pair<File, String> generateTokenAndWriteToFile(String fileName) 
throws IOException {
+        File tokenFile = folder.newFile(fileName);
+        String token = UUID.randomUUID().toString();
+        FileUtils.writeStringToFile(tokenFile, token);
+        return Pair.of(tokenFile, token);
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/auth/CredentialsProviderFactoryTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/CredentialsProviderFactoryTest.java
new file mode 100644
index 0000000000..e62a65a79a
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/CredentialsProviderFactoryTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.auth;
+
+import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.RESTCatalogOptions;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+
+import static 
org.apache.paimon.rest.RESTCatalogInternalOptions.CREDENTIALS_PROVIDER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+/** Test for {@link CredentialsProviderFactory}. */
+public class CredentialsProviderFactoryTest {
+
+    @Rule public TemporaryFolder folder = new TemporaryFolder();
+
+    @Test
+    public void testCreateBearTokenCredentialsProviderSuccess() {
+        Options options = new Options();
+        String token = UUID.randomUUID().toString();
+        options.set(RESTCatalogOptions.TOKEN, token);
+        BearTokenCredentialsProvider credentialsProvider =
+                (BearTokenCredentialsProvider)
+                        CredentialsProviderFactory.createCredentialsProvider(
+                                options, this.getClass().getClassLoader());
+        assertEquals(token, credentialsProvider.token());
+    }
+
+    @Test
+    public void testCreateBearTokenCredentialsProviderFail() {
+        Options options = new Options();
+        assertThrows(
+                IllegalArgumentException.class,
+                () ->
+                        CredentialsProviderFactory.createCredentialsProvider(
+                                options, this.getClass().getClassLoader()));
+    }
+
+    @Test
+    public void testCreateBearTokenFileCredentialsProviderSuccess() throws 
Exception {
+        Options options = new Options();
+        String fileName = "token";
+        File tokenFile = folder.newFile(fileName);
+        String token = UUID.randomUUID().toString();
+        FileUtils.writeStringToFile(tokenFile, token);
+        options.set(RESTCatalogOptions.TOKEN_PROVIDER_PATH, 
tokenFile.getPath());
+        BearTokenFileCredentialsProvider credentialsProvider =
+                (BearTokenFileCredentialsProvider)
+                        CredentialsProviderFactory.createCredentialsProvider(
+                                options, this.getClass().getClassLoader());
+        assertEquals(token, credentialsProvider.token());
+    }
+
+    @Test
+    public void testCreateBearTokenFileCredentialsProviderFail() throws 
Exception {
+        Options options = new Options();
+        options.set(CREDENTIALS_PROVIDER, 
CredentialsProviderType.BEAR_TOKEN_FILE.name());
+        assertThrows(
+                IllegalArgumentException.class,
+                () ->
+                        CredentialsProviderFactory.createCredentialsProvider(
+                                options, this.getClass().getClassLoader()));
+    }
+
+    @Test
+    public void testCreateRefreshBearTokenFileCredentialsProviderSuccess() 
throws Exception {
+        Options options = new Options();
+        String fileName = "token";
+        File tokenFile = folder.newFile(fileName);
+        String token = UUID.randomUUID().toString();
+        FileUtils.writeStringToFile(tokenFile, token);
+        options.set(RESTCatalogOptions.TOKEN_PROVIDER_PATH, 
tokenFile.getPath());
+        options.set(RESTCatalogOptions.TOKEN_EXPIRATION_TIME, 
Duration.ofSeconds(10L));
+        BearTokenFileCredentialsProvider credentialsProvider =
+                (BearTokenFileCredentialsProvider)
+                        CredentialsProviderFactory.createCredentialsProvider(
+                                options, this.getClass().getClassLoader());
+        assertEquals(token, credentialsProvider.token());
+    }
+
+    @Test
+    public void getCredentialsProviderTypeByConfWhenDefineTokenPath() {
+        Options options = new Options();
+        options.set(RESTCatalogOptions.TOKEN_PROVIDER_PATH, "/a/b/c");
+        assertEquals(
+                CredentialsProviderType.BEAR_TOKEN_FILE,
+                
CredentialsProviderFactory.getCredentialsProviderTypeByConf(options));
+    }
+
+    @Test
+    public void getCredentialsProviderTypeByConfWhenConfNotDefined() {
+        Options options = new Options();
+        assertEquals(
+                CredentialsProviderType.BEAR_TOKEN,
+                
CredentialsProviderFactory.getCredentialsProviderTypeByConf(options));
+    }
+
+    @Test
+    public void getCredentialsProviderTypeByConfWhenDefineProviderType() {
+        Options options = new Options();
+        options.set(CREDENTIALS_PROVIDER, 
CredentialsProviderType.BEAR_TOKEN_FILE.name());
+        assertEquals(
+                CredentialsProviderType.BEAR_TOKEN_FILE,
+                
CredentialsProviderFactory.getCredentialsProviderTypeByConf(options));
+    }
+}

Reply via email to