This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 37ee30db56c branch-3.1: [feat](iceberg) Supports OAuth2 to connect  
Iceberg Rest and supports vended-credentials #52831 (#53685)
37ee30db56c is described below

commit 37ee30db56c1d915a3f8654a47829607c19d3850
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jul 22 15:01:25 2025 +0800

    branch-3.1: [feat](iceberg) Supports OAuth2 to connect  Iceberg Rest and 
supports vended-credentials #52831 (#53685)
    
    Cherry-picked from #52831
    
    Co-authored-by: zy-kkk <[email protected]>
---
 .../iceberg/IcebergRestExternalCatalog.java        |  66 +---
 .../iceberg/IcebergVendedCredentialsProvider.java  | 140 ++++++++
 .../datasource/iceberg/source/IcebergScanNode.java |   8 +-
 .../metastore/IcebergPropertiesFactory.java        |  49 +++
 .../property/metastore/IcebergRestProperties.java  | 263 ++++++++++++++-
 .../property/metastore/MetastoreProperties.java    |   7 +-
 .../datasource/property/storage/S3Properties.java  |  16 +-
 .../property/storage/StorageProperties.java        |  17 +
 .../org/apache/doris/planner/IcebergTableSink.java |  12 +-
 .../IcebergVendedCredentialsProviderTest.java      | 371 +++++++++++++++++++++
 .../metastore/IcebergRestPropertiesTest.java       | 234 +++++++++++++
 .../property/storage/S3PropertiesTest.java         |  82 ++++-
 12 files changed, 1177 insertions(+), 88 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
index b92d2c91f96..760ec55bb36 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
@@ -18,76 +18,38 @@
 package org.apache.doris.datasource.iceberg;
 
 import org.apache.doris.datasource.CatalogProperty;
-import org.apache.doris.datasource.property.PropertyConverter;
-import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.datasource.property.metastore.IcebergRestProperties;
 
+import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.s3a.Constants;
-import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.CatalogUtil;
-import org.apache.iceberg.aws.AwsClientProperties;
-import org.apache.iceberg.aws.s3.S3FileIOProperties;
 
-import java.util.HashMap;
 import java.util.Map;
 
 public class IcebergRestExternalCatalog extends IcebergExternalCatalog {
 
     public IcebergRestExternalCatalog(long catalogId, String name, String 
resource, Map<String, String> props,
-                                      String comment) {
+            String comment) {
         super(catalogId, name, comment);
-        props = PropertyConverter.convertToMetaProperties(props);
         catalogProperty = new CatalogProperty(resource, props);
     }
 
     @Override
     protected void initCatalog() {
         icebergCatalogType = ICEBERG_REST;
+        // 1. Get properties for REST catalog service.
+        IcebergRestProperties icebergRestProperties = (IcebergRestProperties) 
catalogProperty.getMetastoreProperties();
 
-        Configuration conf = replaceS3Properties(getConfiguration());
+        // 2. Get FileIO properties for REST catalog service.
+        Map<String, String> fileIOProperties = Maps.newHashMap();
+        Configuration conf = new Configuration();
+        
icebergRestProperties.toFileIOProperties(catalogProperty.getStoragePropertiesMap(),
 fileIOProperties, conf);
 
-        catalog = CatalogUtil.buildIcebergCatalog(getName(),
-                convertToRestCatalogProperties(),
-                conf);
-    }
-
-    private Configuration replaceS3Properties(Configuration conf) {
-        Map<String, String> catalogProperties = 
catalogProperty.getHadoopProperties();
-        initS3Param(conf);
-        String usePahStyle = 
catalogProperties.getOrDefault(PropertyConverter.USE_PATH_STYLE, "true");
-        // Set path style
-        conf.set(PropertyConverter.USE_PATH_STYLE, usePahStyle);
-        conf.set(Constants.PATH_STYLE_ACCESS, usePahStyle);
-        // Get AWS client retry limit
-        conf.set(Constants.RETRY_LIMIT, 
catalogProperties.getOrDefault(Constants.RETRY_LIMIT, "1"));
-        conf.set(Constants.RETRY_THROTTLE_LIMIT, 
catalogProperties.getOrDefault(Constants.RETRY_THROTTLE_LIMIT, "1"));
-        conf.set(Constants.S3GUARD_CONSISTENCY_RETRY_LIMIT,
-                
catalogProperties.getOrDefault(Constants.S3GUARD_CONSISTENCY_RETRY_LIMIT, "1"));
-        return conf;
-    }
-
-    private Map<String, String> convertToRestCatalogProperties() {
+        // 3. Merge properties for REST catalog service.
+        Map<String, String> options = 
Maps.newHashMap(icebergRestProperties.getIcebergRestCatalogProperties());
+        options.putAll(fileIOProperties);
 
-        Map<String, String> props = catalogProperty.getProperties();
-        Map<String, String> restProperties = new HashMap<>(props);
-        restProperties.put(CatalogUtil.ICEBERG_CATALOG_TYPE, 
CatalogUtil.ICEBERG_CATALOG_TYPE_REST);
-        String restUri = props.getOrDefault(CatalogProperties.URI, "");
-        restProperties.put(CatalogProperties.URI, restUri);
-        if (props.containsKey(S3Properties.ENDPOINT)) {
-            restProperties.put(S3FileIOProperties.ENDPOINT, 
props.get(S3Properties.ENDPOINT));
-        }
-        if (props.containsKey(S3Properties.ACCESS_KEY)) {
-            restProperties.put(S3FileIOProperties.ACCESS_KEY_ID, 
props.get(S3Properties.ACCESS_KEY));
-        }
-        if (props.containsKey(S3Properties.SECRET_KEY)) {
-            restProperties.put(S3FileIOProperties.SECRET_ACCESS_KEY, 
props.get(S3Properties.SECRET_KEY));
-        }
-        if (props.containsKey(S3Properties.REGION)) {
-            restProperties.put(AwsClientProperties.CLIENT_REGION, 
props.get(S3Properties.REGION));
-        }
-        if (props.containsKey(PropertyConverter.USE_PATH_STYLE)) {
-            restProperties.put(S3FileIOProperties.PATH_STYLE_ACCESS, 
props.get(PropertyConverter.USE_PATH_STYLE));
-        }
-        return restProperties;
+        // 4. Build iceberg catalog
+        catalog = CatalogUtil.buildIcebergCatalog(getName(), options, conf);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergVendedCredentialsProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergVendedCredentialsProvider.java
new file mode 100644
index 00000000000..06a8b40e2b7
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergVendedCredentialsProvider.java
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg;
+
+import org.apache.doris.datasource.property.metastore.IcebergRestProperties;
+import org.apache.doris.datasource.property.metastore.MetastoreProperties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+import org.apache.doris.datasource.property.storage.StorageProperties.Type;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.aws.s3.S3FileIOProperties;
+
+import java.util.Map;
+
+public class IcebergVendedCredentialsProvider {
+
+    // AWS credential property keys for backend
+    private static final String BACKEND_AWS_ACCESS_KEY = "AWS_ACCESS_KEY";
+    private static final String BACKEND_AWS_SECRET_KEY = "AWS_SECRET_KEY";
+    private static final String BACKEND_AWS_TOKEN = "AWS_TOKEN";
+
+    /**
+     * Interface for future FileIO-based credential extraction.
+     * This interface is designed to be compatible with Iceberg FileIO
+     * when we implement FileIO support in the future.
+     */
+    public interface CredentialExtractor {
+        /**
+         * Extract credentials from a generic properties map.
+         *
+         * @param properties properties map from any source (FileIO, Table IO, 
etc.)
+         * @return extracted credentials as backend properties
+         */
+        Map<String, String> extractCredentials(Map<String, String> properties);
+    }
+
+    /**
+     * Default credential extractor for S3 credentials.
+     */
+    public static class S3CredentialExtractor implements CredentialExtractor {
+        @Override
+        public Map<String, String> extractCredentials(Map<String, String> 
properties) {
+            Map<String, String> credentials = Maps.newHashMap();
+
+            if (properties == null || properties.isEmpty()) {
+                return credentials;
+            }
+
+            // Extract AWS credentials from Iceberg S3 FileIO format
+            if (properties.containsKey(S3FileIOProperties.ACCESS_KEY_ID)) {
+                credentials.put(BACKEND_AWS_ACCESS_KEY, 
properties.get(S3FileIOProperties.ACCESS_KEY_ID));
+            }
+            if (properties.containsKey(S3FileIOProperties.SECRET_ACCESS_KEY)) {
+                credentials.put(BACKEND_AWS_SECRET_KEY, 
properties.get(S3FileIOProperties.SECRET_ACCESS_KEY));
+            }
+            if (properties.containsKey(S3FileIOProperties.SESSION_TOKEN)) {
+                credentials.put(BACKEND_AWS_TOKEN, 
properties.get(S3FileIOProperties.SESSION_TOKEN));
+            }
+
+            return credentials;
+        }
+    }
+
+    private static final S3CredentialExtractor s3Extractor = new 
S3CredentialExtractor();
+
+    /**
+     * Extract vended credentials from Iceberg Table and convert to backend 
properties.
+     *
+     * @param table the Iceberg table
+     * @return Map of backend properties with credentials
+     */
+    public static Map<String, String> extractVendedCredentialsFromTable(Table 
table) {
+        if (table == null || table.io() == null) {
+            return Maps.newHashMap();
+        }
+
+        Map<String, String> ioProperties = table.io().properties();
+        return s3Extractor.extractCredentials(ioProperties);
+    }
+
+    /**
+     * Future method for FileIO-based credential extraction.
+     * This method signature is designed to be compatible with future FileIO 
implementations.
+     *
+     * @param fileIoProperties properties from FileIO (reserved for future use)
+     * @param extractor custom credential extractor
+     * @return extracted credentials
+     */
+    @VisibleForTesting
+    public static Map<String, String> extractCredentialsFromFileIO(Map<String, 
String> fileIoProperties,
+            CredentialExtractor extractor) {
+        return extractor.extractCredentials(fileIoProperties);
+    }
+
+    /**
+     * Get backend location properties for Iceberg catalog with optional 
vended credentials support.
+     * This method extracts the duplicate logic from IcebergScanNode and 
IcebergTableSink.
+     *
+     * @param storagePropertiesMap Map of storage properties
+     * @param icebergTable Optional Iceberg table for vended credentials 
extraction
+     * @return Map of backend location properties
+     */
+    public static Map<String, String> getBackendLocationProperties(
+            MetastoreProperties metastoreProperties,
+            Map<Type, StorageProperties> storagePropertiesMap,
+            Table icebergTable) {
+        boolean vendedCredentialsEnabled = false;
+        if (metastoreProperties instanceof IcebergRestProperties) {
+            vendedCredentialsEnabled =
+                    ((IcebergRestProperties) 
metastoreProperties).isIcebergRestVendedCredentialsEnabled();
+        }
+        Map<String, String> locationProperties = Maps.newHashMap();
+        for (StorageProperties storageProperties : 
storagePropertiesMap.values()) {
+            if (vendedCredentialsEnabled && icebergTable != null) {
+                Map<String, String> vendedCredentials = 
extractVendedCredentialsFromTable(icebergTable);
+                
locationProperties.putAll(storageProperties.getBackendConfigProperties(vendedCredentials));
+            } else {
+                
locationProperties.putAll(storageProperties.getBackendConfigProperties());
+            }
+        }
+        return locationProperties;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index d9a12fababf..d4cdf3a91a1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -37,6 +37,7 @@ import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergUtils;
+import org.apache.doris.datasource.iceberg.IcebergVendedCredentialsProvider;
 import org.apache.doris.nereids.exceptions.NotSupportedException;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.qe.SessionVariable;
@@ -523,7 +524,12 @@ public class IcebergScanNode extends FileQueryScanNode {
 
     @Override
     public Map<String, String> getLocationProperties() throws UserException {
-        return source.getCatalog().getCatalogProperty().getHadoopProperties();
+        IcebergExternalCatalog catalog = (IcebergExternalCatalog) 
source.getCatalog();
+        return IcebergVendedCredentialsProvider.getBackendLocationProperties(
+                catalog.getCatalogProperty().getMetastoreProperties(),
+                catalog.getCatalogProperty().getStoragePropertiesMap(),
+                icebergTable
+        );
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergPropertiesFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergPropertiesFactory.java
new file mode 100644
index 00000000000..7474747cfaa
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergPropertiesFactory.java
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import org.apache.doris.common.UserException;
+
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Function;
+
+public class IcebergPropertiesFactory implements MetastorePropertiesFactory {
+
+    private static final Map<String, Function<Map<String, String>, 
MetastoreProperties>> REGISTERED_SUBTYPES =
+            new HashMap<>();
+
+    static {
+        register("rest", IcebergRestProperties::new);
+    }
+
+    public static void register(String subType, Function<Map<String, String>, 
MetastoreProperties> constructor) {
+        REGISTERED_SUBTYPES.put(subType.toLowerCase(Locale.ROOT), constructor);
+    }
+
+    @Override
+    public MetastoreProperties create(Map<String, String> props) throws 
UserException {
+        String subType = props.getOrDefault("iceberg.catalog.type", 
"rest").toLowerCase(Locale.ROOT);
+        Function<Map<String, String>, MetastoreProperties> constructor =
+                REGISTERED_SUBTYPES.getOrDefault(subType, 
REGISTERED_SUBTYPES.get("rest"));
+        MetastoreProperties instance = constructor.apply(props);
+        instance.initNormalizeAndCheckProps();
+        return instance;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java
index 987aa0d519b..9f9af0f85be 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java
@@ -18,39 +18,284 @@
 package org.apache.doris.datasource.property.metastore;
 
 import org.apache.doris.datasource.property.ConnectorProperty;
+import org.apache.doris.datasource.property.ParamRules;
+import 
org.apache.doris.datasource.property.storage.AbstractS3CompatibleProperties;
+import org.apache.doris.datasource.property.storage.HdfsCompatibleProperties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.aws.AwsClientProperties;
+import org.apache.iceberg.aws.s3.S3FileIOProperties;
+import org.apache.iceberg.rest.auth.OAuth2Properties;
+import org.apache.logging.log4j.util.Strings;
+
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
 public class IcebergRestProperties extends MetastoreProperties {
 
-    @ConnectorProperty(names = {"iceberg.rest.uri"},
+    // REST catalog property constants
+    private static final String PREFIX_PROPERTY = "prefix";
+    private static final String VENDED_CREDENTIALS_HEADER = 
"header.X-Iceberg-Access-Delegation";
+    private static final String VENDED_CREDENTIALS_VALUE = 
"vended-credentials";
+
+    private Map<String, String> icebergRestCatalogProperties;
+
+    @ConnectorProperty(names = {"iceberg.rest.uri", "uri"},
             description = "The uri of the iceberg rest catalog service.")
     private String icebergRestUri = "";
 
+    @ConnectorProperty(names = {"iceberg.rest.prefix"},
+            required = false,
+            description = "The prefix of the iceberg rest catalog service.")
+    private String icebergRestPrefix = "";
+
+    @ConnectorProperty(names = {"iceberg.rest.warehouse", "warehouse"},
+            required = false,
+            description = "The warehouse of the iceberg rest catalog service.")
+    private String icebergRestWarehouse = "";
+
     @ConnectorProperty(names = {"iceberg.rest.security.type"},
             required = false,
-            supported = false,
-            description = "The security type of the iceberg rest catalog 
service.")
+            description = "The security type of the iceberg rest catalog 
service,"
+                    + "optional: (none, oauth2), default: none.")
     private String icebergRestSecurityType = "none";
 
-    @ConnectorProperty(names = {"iceberg.rest.prefix"},
+    @ConnectorProperty(names = {"iceberg.rest.session"},
             required = false,
             supported = false,
-            description = "The prefix of the iceberg rest catalog service.")
-    private String icebergRestPrefix = "";
+            description = "The session type of the iceberg rest catalog 
service,"
+                    + "optional: (none, user), default: none.")
+    private String icebergRestSession = "none";
+
+    @ConnectorProperty(names = {"iceberg.rest.session-timeout"},
+            required = false,
+            supported = false,
+            description = "The session timeout of the iceberg rest catalog 
service.")
+    private String icebergRestSessionTimeout = "0";
+
+    @ConnectorProperty(names = {"iceberg.rest.oauth2.token"},
+            required = false,
+            description = "The oauth2 token for the iceberg rest catalog 
service.")
+    private String icebergRestOauth2Token;
+
+    @ConnectorProperty(names = {"iceberg.rest.oauth2.credential"},
+            required = false,
+            description = "The oauth2 credential for the iceberg rest catalog 
service.")
+    private String icebergRestOauth2Credential;
+
+    @ConnectorProperty(names = {"iceberg.rest.oauth2.scope"},
+            required = false,
+            description = "The oauth2 scope for the iceberg rest catalog 
service.")
+    private String icebergRestOauth2Scope;
+
+    @ConnectorProperty(names = {"iceberg.rest.oauth2.server-uri"},
+            required = false,
+            description = "The oauth2 server uri for fetching token.")
+    private String icebergRestOauth2ServerUri;
+
+    @ConnectorProperty(names = {"iceberg.rest.oauth2.token-refresh-enabled"},
+            required = false,
+            description = "Enable oauth2 token refresh for the iceberg rest 
catalog service.")
+    private String icebergRestOauth2TokenRefreshEnabled = "false";
+
+    @ConnectorProperty(names = {"iceberg.rest.vended-credentials-enabled"},
+            required = false,
+            description = "Enable vended credentials for the iceberg rest 
catalog service.")
+    private String icebergRestVendedCredentialsEnabled = "false";
+
+    @ConnectorProperty(names = {"iceberg.rest.nested-namespace-enabled"},
+            required = false,
+            supported = false,
+            description = "Enable nested namespace for the iceberg rest 
catalog service.")
+    private String icebergRestNestedNamespaceEnabled = "true";
+
+    @ConnectorProperty(names = {"iceberg.rest.case-insensitive-name-matching"},
+            required = false,
+            supported = false,
+            description = "Enable case insensitive name matching for the 
iceberg rest catalog service.")
+    private String icebergRestCaseInsensitiveNameMatching = "false";
+
+    @ConnectorProperty(names = 
{"iceberg.rest.case-insensitive-name-matching.cache-ttl"},
+            required = false,
+            supported = false,
+            description = "The cache TTL for case insensitive name matching in 
ms.")
+    private String icebergRestCaseInsensitiveNameMatchingCacheTtlMs = "0";
 
     public IcebergRestProperties(Map<String, String> origProps) {
         super(Type.ICEBERG_REST, origProps);
     }
 
+    @Override
+    public void initNormalizeAndCheckProps() {
+        super.initNormalizeAndCheckProps();
+        validateSecurityType();
+        buildRules().validate();
+        initIcebergRestCatalogProperties();
+    }
+
     @Override
     protected void checkRequiredProperties() {
     }
 
-    public void toIcebergRestCatalogProperties(Map<String, String> 
catalogProps) {
+    private void validateSecurityType() {
+        try {
+            Security.valueOf(icebergRestSecurityType.toUpperCase());
+        } catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException("Invalid security type: " + 
icebergRestSecurityType
+                    + ". Supported values are: none, oauth2");
+        }
+    }
+
+    private ParamRules buildRules() {
+        ParamRules rules = new ParamRules()
+                // OAuth2 requires either credential or token, but not both
+                .mutuallyExclusive(icebergRestOauth2Credential, 
icebergRestOauth2Token,
+                        "OAuth2 cannot have both credential and token 
configured")
+                // If using credential flow, server URI is required
+                .requireAllIfPresent(icebergRestOauth2Credential,
+                        new String[] {icebergRestOauth2ServerUri},
+                        "OAuth2 credential flow requires server-uri");
+
+        // Custom validation: OAuth2 scope should not be used with token
+        if (Strings.isNotBlank(icebergRestOauth2Token) && 
Strings.isNotBlank(icebergRestOauth2Scope)) {
+            throw new IllegalArgumentException("OAuth2 scope is only 
applicable when using credential, not token");
+        }
+        // Custom validation: If OAuth2 is enabled, require either credential 
or token
+        if ("oauth2".equalsIgnoreCase(icebergRestSecurityType)) {
+            boolean hasCredential = 
Strings.isNotBlank(icebergRestOauth2Credential);
+            boolean hasToken = Strings.isNotBlank(icebergRestOauth2Token);
+            if (!hasCredential && !hasToken) {
+                throw new IllegalArgumentException("OAuth2 requires either 
credential or token");
+            }
+        }
+        return rules;
+    }
+
+    private void initIcebergRestCatalogProperties() {
+        icebergRestCatalogProperties = new HashMap<>();
+        // Core catalog properties
+        addCoreCatalogProperties();
+        // Optional properties
+        addOptionalProperties();
+        // Authentication properties
+        addAuthenticationProperties();
+    }
+
+    private void addCoreCatalogProperties() {
         // See CatalogUtil.java
-        catalogProps.put("type", "rest");
+        icebergRestCatalogProperties.put(CatalogUtil.ICEBERG_CATALOG_TYPE, 
CatalogUtil.ICEBERG_CATALOG_TYPE_REST);
         // See CatalogProperties.java
-        catalogProps.put("uri", icebergRestUri);
+        icebergRestCatalogProperties.put(CatalogProperties.URI, 
icebergRestUri);
+    }
+
+    private void addOptionalProperties() {
+        if (Strings.isNotBlank(icebergRestPrefix)) {
+            icebergRestCatalogProperties.put(PREFIX_PROPERTY, 
icebergRestPrefix);
+        }
+
+        if (Strings.isNotBlank(icebergRestWarehouse)) {
+            
icebergRestCatalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, 
icebergRestWarehouse);
+        }
+
+        if (isIcebergRestVendedCredentialsEnabled()) {
+            icebergRestCatalogProperties.put(VENDED_CREDENTIALS_HEADER, 
VENDED_CREDENTIALS_VALUE);
+        }
+    }
+
+    private void addAuthenticationProperties() {
+        Security security = 
Security.valueOf(icebergRestSecurityType.toUpperCase());
+        if (security == Security.OAUTH2) {
+            addOAuth2Properties();
+        }
+    }
+
+    private void addOAuth2Properties() {
+        if (Strings.isNotBlank(icebergRestOauth2Credential)) {
+            // Client Credentials Flow
+            icebergRestCatalogProperties.put(OAuth2Properties.CREDENTIAL, 
icebergRestOauth2Credential);
+            
icebergRestCatalogProperties.put(OAuth2Properties.OAUTH2_SERVER_URI, 
icebergRestOauth2ServerUri);
+            if (Strings.isNotBlank(icebergRestOauth2Scope)) {
+                icebergRestCatalogProperties.put(OAuth2Properties.SCOPE, 
icebergRestOauth2Scope);
+            }
+            
icebergRestCatalogProperties.put(OAuth2Properties.TOKEN_REFRESH_ENABLED,
+                    icebergRestOauth2TokenRefreshEnabled);
+        } else {
+            // Pre-configured Token Flow
+            icebergRestCatalogProperties.put(OAuth2Properties.TOKEN, 
icebergRestOauth2Token);
+        }
+    }
+
+    public Map<String, String> getIcebergRestCatalogProperties() {
+        return Collections.unmodifiableMap(icebergRestCatalogProperties);
+    }
+
+    public boolean isIcebergRestVendedCredentialsEnabled() {
+        return Boolean.parseBoolean(icebergRestVendedCredentialsEnabled);
+    }
+
+    /**
+     * Unified method to configure FileIO properties for Iceberg catalog.
+     * This method handles all storage types (HDFS, S3, MinIO, etc.) and 
populates
+     * the fileIOProperties map and Configuration object accordingly.
+     *
+     * @param storagePropertiesMap Map of storage properties
+     * @param fileIOProperties Options map to be populated
+     * @param conf Configuration object to be populated (for HDFS), will be 
created if null and HDFS is used
+     */
+    public void toFileIOProperties(Map<StorageProperties.Type, 
StorageProperties> storagePropertiesMap,
+            Map<String, String> fileIOProperties, Configuration conf) {
+
+        for (StorageProperties storageProperties : 
storagePropertiesMap.values()) {
+            if (storageProperties instanceof HdfsCompatibleProperties) {
+                
storageProperties.getBackendConfigProperties().forEach(conf::set);
+            } else if (storageProperties instanceof 
AbstractS3CompatibleProperties) {
+                // For all S3-compatible storage types, put properties in 
fileIOProperties map
+                toS3FileIOProperties((AbstractS3CompatibleProperties) 
storageProperties, fileIOProperties);
+            } else {
+                // For other storage types, just use fileIOProperties map
+                
fileIOProperties.putAll(storageProperties.getBackendConfigProperties());
+            }
+        }
+
+    }
+
+    /**
+     * Configure S3 FileIO properties for all S3-compatible storage types (S3, 
MinIO, etc.)
+     * This method provides a unified way to convert S3-compatible properties 
to Iceberg S3FileIO format.
+     *
+     * @param s3Properties S3-compatible properties
+     * @param options Options map to be populated with S3 FileIO properties
+     */
+    public void toS3FileIOProperties(AbstractS3CompatibleProperties 
s3Properties, Map<String, String> options) {
+        // Common properties - only set if not blank
+        if (StringUtils.isNotBlank(s3Properties.getEndpoint())) {
+            options.put(S3FileIOProperties.ENDPOINT, 
s3Properties.getEndpoint());
+        }
+        if (StringUtils.isNotBlank(s3Properties.getUsePathStyle())) {
+            options.put(S3FileIOProperties.PATH_STYLE_ACCESS, 
s3Properties.getUsePathStyle());
+        }
+        if (StringUtils.isNotBlank(s3Properties.getRegion())) {
+            options.put(AwsClientProperties.CLIENT_REGION, 
s3Properties.getRegion());
+        }
+        if (StringUtils.isNotBlank(s3Properties.getAccessKey())) {
+            options.put(S3FileIOProperties.ACCESS_KEY_ID, 
s3Properties.getAccessKey());
+        }
+        if (StringUtils.isNotBlank(s3Properties.getSecretKey())) {
+            options.put(S3FileIOProperties.SECRET_ACCESS_KEY, 
s3Properties.getSecretKey());
+        }
+        if (StringUtils.isNotBlank(s3Properties.getSessionToken())) {
+            options.put(S3FileIOProperties.SESSION_TOKEN, 
s3Properties.getSessionToken());
+        }
+    }
+
+
+    public enum Security {
+        NONE,
+        OAUTH2,
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/MetastoreProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/MetastoreProperties.java
index 186a3b0c22f..7fcccfd74e1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/MetastoreProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/MetastoreProperties.java
@@ -48,6 +48,7 @@ public class MetastoreProperties extends ConnectionProperties 
{
         GLUE("glue"),
         DLF("dlf"),
         ICEBERG_REST("rest"),
+        ICEBERG("iceberg"),
         DATAPROC("dataproc"),
         FILE_SYSTEM("filesystem", "hadoop"),
         UNKNOWN();
@@ -82,11 +83,7 @@ public class MetastoreProperties extends 
ConnectionProperties {
     static {
         //subclasses should be registered here
         register(Type.HMS, new HMSPropertiesFactory());
-        register(Type.ICEBERG_REST, props -> {
-            IcebergRestProperties inst = new IcebergRestProperties(props);
-            inst.initNormalizeAndCheckProps();
-            return inst;
-        });
+        register(Type.ICEBERG, new IcebergPropertiesFactory());
         register(Type.FILE_SYSTEM, props -> {
             FileMetastoreProperties inst = new FileMetastoreProperties(props);
             inst.initNormalizeAndCheckProps();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
index f036494875e..91cc2e0b221 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
@@ -164,6 +164,12 @@ public class S3Properties extends 
AbstractS3CompatibleProperties {
         if (StringUtils.isNotBlank(s3ExternalId) && 
StringUtils.isNotBlank(s3IAMRole)) {
             return;
         }
+        // When using vended credentials with a REST catalog, AK/SK are not 
provided directly.
+        // The credentials will be fetched from the REST service later.
+        // So we skip the credential check in this case.
+        if 
(Boolean.parseBoolean(origProps.getOrDefault("iceberg.rest.vended-credentials-enabled",
 "false"))) {
+            return;
+        }
         throw new StoragePropertiesException("Please set s3.access_key and 
s3.secret_key or s3.role_arn and "
                 + "s3.external_id");
     }
@@ -229,15 +235,6 @@ public class S3Properties extends 
AbstractS3CompatibleProperties {
         options.set("s3.secret-key", s3SecretKey);
     }*/
 
-    public void toIcebergS3FileIOProperties(Map<String, String> catalogProps) {
-        // See S3FileIOProperties.java
-        catalogProps.put("s3.endpoint", endpoint);
-        catalogProps.put("s3.access-key-id", accessKey);
-        catalogProps.put("s3.secret-access-key", secretKey);
-        catalogProps.put("client.region", region);
-        catalogProps.put("s3.path-style-access", usePathStyle);
-    }
-
     @Override
     public Map<String, String> getBackendConfigProperties() {
         Map<String, String> backendProperties = 
generateBackendS3Configuration(s3ConnectionMaximum,
@@ -283,5 +280,4 @@ public class S3Properties extends 
AbstractS3CompatibleProperties {
                 ProfileCredentialsProvider.create(),
                 InstanceProfileCredentialsProvider.create());
     }
-
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
index 3f8ed8157fd..5eaa1e0aca4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
@@ -27,6 +27,7 @@ import lombok.Getter;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
@@ -64,6 +65,22 @@ public abstract class StorageProperties extends 
ConnectionProperties {
 
     public abstract Map<String, String> getBackendConfigProperties();
 
+    /**
+     * Get backend configuration properties with optional runtime properties.
+     * This method allows passing runtime properties (like vended credentials)
+     * that should be merged with the base configuration.
+     *
+     * @param runtimeProperties additional runtime properties to merge, can be 
null
+     * @return Map of backend properties including runtime properties
+     */
+    public Map<String, String> getBackendConfigProperties(Map<String, String> 
runtimeProperties) {
+        Map<String, String> properties = new 
HashMap<>(getBackendConfigProperties());
+        if (runtimeProperties != null && !runtimeProperties.isEmpty()) {
+            properties.putAll(runtimeProperties);
+        }
+        return properties;
+    }
+
     @Getter
     protected Type type;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
index 6ad1d6ddd8b..a3918a3ce54 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
@@ -19,8 +19,10 @@ package org.apache.doris.planner;
 
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.util.LocationPath;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergUtils;
+import org.apache.doris.datasource.iceberg.IcebergVendedCredentialsProvider;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.BaseExternalTableInsertCommandContext;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
 import org.apache.doris.thrift.TDataSink;
@@ -43,7 +45,6 @@ import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.types.Types;
 
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
@@ -129,9 +130,12 @@ public class IcebergTableSink extends 
BaseExternalTableDataSink {
         
tSink.setCompressionType(getTFileCompressType(IcebergUtils.getFileCompress(icebergTable)));
 
         // hadoop config
-        HashMap<String, String> props = new 
HashMap<>(icebergTable.properties());
-        Map<String, String> catalogProps = 
targetTable.getCatalog().getProperties();
-        props.putAll(catalogProps);
+        IcebergExternalCatalog catalog = (IcebergExternalCatalog) 
targetTable.getCatalog();
+        Map<String, String> props = 
IcebergVendedCredentialsProvider.getBackendLocationProperties(
+                catalog.getCatalogProperty().getMetastoreProperties(),
+                catalog.getCatalogProperty().getStoragePropertiesMap(),
+                icebergTable
+        );
         tSink.setHadoopConfig(props);
 
         // location
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergVendedCredentialsProviderTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergVendedCredentialsProviderTest.java
new file mode 100644
index 00000000000..cf46193f15b
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergVendedCredentialsProviderTest.java
@@ -0,0 +1,371 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg;
+
+import org.apache.doris.datasource.property.metastore.IcebergRestProperties;
+import org.apache.doris.datasource.property.metastore.MetastoreProperties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+import org.apache.doris.datasource.property.storage.StorageProperties.Type;
+
+import org.apache.iceberg.Table;
+import org.apache.iceberg.aws.s3.S3FileIOProperties;
+import org.apache.iceberg.io.FileIO;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class IcebergVendedCredentialsProviderTest {
+
+    @Test
+    public void testS3CredentialExtractorWithValidCredentials() {
+        IcebergVendedCredentialsProvider.S3CredentialExtractor extractor =
+                new IcebergVendedCredentialsProvider.S3CredentialExtractor();
+
+        Map<String, String> properties = new HashMap<>();
+        properties.put(S3FileIOProperties.ACCESS_KEY_ID, "test-access-key");
+        properties.put(S3FileIOProperties.SECRET_ACCESS_KEY, 
"test-secret-key");
+        properties.put(S3FileIOProperties.SESSION_TOKEN, "test-session-token");
+
+        Map<String, String> credentials = 
extractor.extractCredentials(properties);
+
+        Assertions.assertEquals("test-access-key", 
credentials.get("AWS_ACCESS_KEY"));
+        Assertions.assertEquals("test-secret-key", 
credentials.get("AWS_SECRET_KEY"));
+        Assertions.assertEquals("test-session-token", 
credentials.get("AWS_TOKEN"));
+    }
+
+    @Test
+    public void testS3CredentialExtractorWithPartialCredentials() {
+        IcebergVendedCredentialsProvider.S3CredentialExtractor extractor =
+                new IcebergVendedCredentialsProvider.S3CredentialExtractor();
+
+        Map<String, String> properties = new HashMap<>();
+        properties.put(S3FileIOProperties.ACCESS_KEY_ID, "test-access-key");
+        properties.put(S3FileIOProperties.SECRET_ACCESS_KEY, 
"test-secret-key");
+        // No session token
+
+        Map<String, String> credentials = 
extractor.extractCredentials(properties);
+
+        Assertions.assertEquals("test-access-key", 
credentials.get("AWS_ACCESS_KEY"));
+        Assertions.assertEquals("test-secret-key", 
credentials.get("AWS_SECRET_KEY"));
+        Assertions.assertFalse(credentials.containsKey("AWS_TOKEN"));
+    }
+
+    @Test
+    public void testS3CredentialExtractorWithEmptyProperties() {
+        IcebergVendedCredentialsProvider.S3CredentialExtractor extractor =
+                new IcebergVendedCredentialsProvider.S3CredentialExtractor();
+
+        Map<String, String> credentials = extractor.extractCredentials(new 
HashMap<>());
+        Assertions.assertTrue(credentials.isEmpty());
+
+        credentials = extractor.extractCredentials(null);
+        Assertions.assertTrue(credentials.isEmpty());
+    }
+
+    @Test
+    public void testS3CredentialExtractorWithNonAwsProperties() {
+        IcebergVendedCredentialsProvider.S3CredentialExtractor extractor =
+                new IcebergVendedCredentialsProvider.S3CredentialExtractor();
+
+        Map<String, String> properties = new HashMap<>();
+        properties.put("some.other.property", "value");
+        properties.put("unrelated.key", "unrelated-value");
+
+        Map<String, String> credentials = 
extractor.extractCredentials(properties);
+        Assertions.assertTrue(credentials.isEmpty());
+    }
+
+    @Test
+    public void testExtractVendedCredentialsFromTable() {
+        Table table = Mockito.mock(Table.class);
+        FileIO fileIO = Mockito.mock(FileIO.class);
+
+        Map<String, String> ioProperties = new HashMap<>();
+        ioProperties.put(S3FileIOProperties.ACCESS_KEY_ID, 
"vended-access-key");
+        ioProperties.put(S3FileIOProperties.SECRET_ACCESS_KEY, 
"vended-secret-key");
+        ioProperties.put(S3FileIOProperties.SESSION_TOKEN, 
"vended-session-token");
+
+        Mockito.when(table.io()).thenReturn(fileIO);
+        Mockito.when(fileIO.properties()).thenReturn(ioProperties);
+
+        Map<String, String> credentials = 
IcebergVendedCredentialsProvider.extractVendedCredentialsFromTable(table);
+
+        Assertions.assertEquals("vended-access-key", 
credentials.get("AWS_ACCESS_KEY"));
+        Assertions.assertEquals("vended-secret-key", 
credentials.get("AWS_SECRET_KEY"));
+        Assertions.assertEquals("vended-session-token", 
credentials.get("AWS_TOKEN"));
+    }
+
+    @Test
+    public void testExtractVendedCredentialsFromTableWithNullTable() {
+        Map<String, String> credentials = 
IcebergVendedCredentialsProvider.extractVendedCredentialsFromTable(null);
+        Assertions.assertTrue(credentials.isEmpty());
+    }
+
+    @Test
+    public void testExtractVendedCredentialsFromTableWithNullIO() {
+        Table table = Mockito.mock(Table.class);
+        Mockito.when(table.io()).thenReturn(null);
+
+        Map<String, String> credentials = 
IcebergVendedCredentialsProvider.extractVendedCredentialsFromTable(table);
+        Assertions.assertTrue(credentials.isEmpty());
+    }
+
+    @Test
+    public void testExtractCredentialsFromFileIO() {
+        Map<String, String> fileIoProperties = new HashMap<>();
+        fileIoProperties.put(S3FileIOProperties.ACCESS_KEY_ID, 
"fileio-access-key");
+        fileIoProperties.put(S3FileIOProperties.SECRET_ACCESS_KEY, 
"fileio-secret-key");
+
+        IcebergVendedCredentialsProvider.CredentialExtractor extractor =
+                new IcebergVendedCredentialsProvider.S3CredentialExtractor();
+
+        Map<String, String> credentials = IcebergVendedCredentialsProvider
+                .extractCredentialsFromFileIO(fileIoProperties, extractor);
+
+        Assertions.assertEquals("fileio-access-key", 
credentials.get("AWS_ACCESS_KEY"));
+        Assertions.assertEquals("fileio-secret-key", 
credentials.get("AWS_SECRET_KEY"));
+        Assertions.assertFalse(credentials.containsKey("AWS_TOKEN"));
+    }
+
+    @Test
+    public void testCustomCredentialExtractor() {
+        IcebergVendedCredentialsProvider.CredentialExtractor customExtractor =
+                new IcebergVendedCredentialsProvider.CredentialExtractor() {
+                    @Override
+                    public Map<String, String> extractCredentials(Map<String, 
String> properties) {
+                        Map<String, String> result = new HashMap<>();
+                        if (properties != null && 
properties.containsKey("custom.key")) {
+                            result.put("CUSTOM_BACKEND_PROPERTY", 
properties.get("custom.key"));
+                        }
+                        return result;
+                    }
+                };
+
+        Map<String, String> properties = new HashMap<>();
+        properties.put("custom.key", "custom-value");
+
+        Map<String, String> credentials = IcebergVendedCredentialsProvider
+                .extractCredentialsFromFileIO(properties, customExtractor);
+
+        Assertions.assertEquals("custom-value", 
credentials.get("CUSTOM_BACKEND_PROPERTY"));
+    }
+
+    @Test
+    public void 
testGetBackendLocationPropertiesWithRestVendedCredentialsEnabled() {
+        // Mock IcebergRestProperties with vended credentials enabled
+        IcebergRestProperties restProperties = 
Mockito.mock(IcebergRestProperties.class);
+        
Mockito.when(restProperties.isIcebergRestVendedCredentialsEnabled()).thenReturn(true);
+
+        // Mock storage properties
+        StorageProperties storageProperties = 
Mockito.mock(StorageProperties.class);
+        Map<String, String> baseProperties = new HashMap<>();
+        baseProperties.put("base.property", "base-value");
+
+        Map<String, String> vendedProperties = new HashMap<>();
+        vendedProperties.put("base.property", "base-value");
+        vendedProperties.put("AWS_ACCESS_KEY", "vended-access-key");
+        vendedProperties.put("AWS_SECRET_KEY", "vended-secret-key");
+        vendedProperties.put("AWS_TOKEN", "vended-session-token");
+
+        
Mockito.when(storageProperties.getBackendConfigProperties()).thenReturn(baseProperties);
+        
Mockito.when(storageProperties.getBackendConfigProperties(Mockito.anyMap())).thenReturn(vendedProperties);
+
+        Map<Type, StorageProperties> storagePropertiesMap = new HashMap<>();
+        storagePropertiesMap.put(Type.S3, storageProperties);
+
+        // Mock table with vended credentials
+        Table table = Mockito.mock(Table.class);
+        FileIO fileIO = Mockito.mock(FileIO.class);
+        Map<String, String> ioProperties = new HashMap<>();
+        ioProperties.put(S3FileIOProperties.ACCESS_KEY_ID, 
"vended-access-key");
+        ioProperties.put(S3FileIOProperties.SECRET_ACCESS_KEY, 
"vended-secret-key");
+        ioProperties.put(S3FileIOProperties.SESSION_TOKEN, 
"vended-session-token");
+
+        Mockito.when(table.io()).thenReturn(fileIO);
+        Mockito.when(fileIO.properties()).thenReturn(ioProperties);
+
+        // Test with vended credentials enabled
+        Map<String, String> result = 
IcebergVendedCredentialsProvider.getBackendLocationProperties(
+                restProperties, storagePropertiesMap, table);
+
+        Assertions.assertEquals("base-value", result.get("base.property"));
+        Assertions.assertEquals("vended-access-key", 
result.get("AWS_ACCESS_KEY"));
+        Assertions.assertEquals("vended-secret-key", 
result.get("AWS_SECRET_KEY"));
+        Assertions.assertEquals("vended-session-token", 
result.get("AWS_TOKEN"));
+    }
+
+    @Test
+    public void 
testGetBackendLocationPropertiesWithRestVendedCredentialsDisabled() {
+        // Mock IcebergRestProperties with vended credentials disabled
+        IcebergRestProperties restProperties = 
Mockito.mock(IcebergRestProperties.class);
+        
Mockito.when(restProperties.isIcebergRestVendedCredentialsEnabled()).thenReturn(false);
+
+        // Mock storage properties
+        StorageProperties storageProperties = 
Mockito.mock(StorageProperties.class);
+        Map<String, String> baseProperties = new HashMap<>();
+        baseProperties.put("base.property", "base-value");
+        baseProperties.put("AWS_ACCESS_KEY", "static-access-key");
+        baseProperties.put("AWS_SECRET_KEY", "static-secret-key");
+
+        
Mockito.when(storageProperties.getBackendConfigProperties()).thenReturn(baseProperties);
+
+        Map<Type, StorageProperties> storagePropertiesMap = new HashMap<>();
+        storagePropertiesMap.put(Type.S3, storageProperties);
+
+        // Mock table (should be ignored since vended credentials are disabled)
+        Table table = Mockito.mock(Table.class);
+        FileIO fileIO = Mockito.mock(FileIO.class);
+        Map<String, String> ioProperties = new HashMap<>();
+        ioProperties.put(S3FileIOProperties.ACCESS_KEY_ID, 
"vended-access-key");
+        ioProperties.put(S3FileIOProperties.SECRET_ACCESS_KEY, 
"vended-secret-key");
+
+        Mockito.when(table.io()).thenReturn(fileIO);
+        Mockito.when(fileIO.properties()).thenReturn(ioProperties);
+
+        // Test with vended credentials disabled
+        Map<String, String> result = 
IcebergVendedCredentialsProvider.getBackendLocationProperties(
+                restProperties, storagePropertiesMap, table);
+
+        Assertions.assertEquals("base-value", result.get("base.property"));
+        Assertions.assertEquals("static-access-key", 
result.get("AWS_ACCESS_KEY"));
+        Assertions.assertEquals("static-secret-key", 
result.get("AWS_SECRET_KEY"));
+        Assertions.assertFalse(result.containsKey("AWS_TOKEN"));
+    }
+
+    @Test
+    public void testGetBackendLocationPropertiesWithNonRestMetastore() {
+        // Mock non-REST metastore properties
+        MetastoreProperties metastoreProperties = 
Mockito.mock(MetastoreProperties.class);
+
+        // Mock storage properties
+        StorageProperties storageProperties = 
Mockito.mock(StorageProperties.class);
+        Map<String, String> baseProperties = new HashMap<>();
+        baseProperties.put("base.property", "base-value");
+        baseProperties.put("AWS_ACCESS_KEY", "static-access-key");
+
+        
Mockito.when(storageProperties.getBackendConfigProperties()).thenReturn(baseProperties);
+
+        Map<Type, StorageProperties> storagePropertiesMap = new HashMap<>();
+        storagePropertiesMap.put(Type.S3, storageProperties);
+
+        // Mock table (should be ignored for non-REST catalogs)
+        Table table = Mockito.mock(Table.class);
+
+        // Test with non-REST metastore
+        Map<String, String> result = 
IcebergVendedCredentialsProvider.getBackendLocationProperties(
+                metastoreProperties, storagePropertiesMap, table);
+
+        Assertions.assertEquals("base-value", result.get("base.property"));
+        Assertions.assertEquals("static-access-key", 
result.get("AWS_ACCESS_KEY"));
+        Assertions.assertFalse(result.containsKey("AWS_TOKEN"));
+    }
+
+    @Test
+    public void testGetBackendLocationPropertiesWithMultipleStorageTypes() {
+        // Mock IcebergRestProperties with vended credentials enabled
+        IcebergRestProperties restProperties = 
Mockito.mock(IcebergRestProperties.class);
+        
Mockito.when(restProperties.isIcebergRestVendedCredentialsEnabled()).thenReturn(true);
+
+        // Mock S3 storage properties
+        StorageProperties s3Properties = Mockito.mock(StorageProperties.class);
+        Map<String, String> s3BaseProperties = new HashMap<>();
+        s3BaseProperties.put("s3.property", "s3-value");
+
+        Map<String, String> s3VendedProperties = new HashMap<>();
+        s3VendedProperties.put("s3.property", "s3-value");
+        s3VendedProperties.put("AWS_ACCESS_KEY", "vended-access-key");
+
+        
Mockito.when(s3Properties.getBackendConfigProperties()).thenReturn(s3BaseProperties);
+        
Mockito.when(s3Properties.getBackendConfigProperties(Mockito.anyMap())).thenReturn(s3VendedProperties);
+
+        // Mock HDFS storage properties
+        StorageProperties hdfsProperties = 
Mockito.mock(StorageProperties.class);
+        Map<String, String> hdfsBaseProperties = new HashMap<>();
+        hdfsBaseProperties.put("hdfs.property", "hdfs-value");
+
+        
Mockito.when(hdfsProperties.getBackendConfigProperties()).thenReturn(hdfsBaseProperties);
+        
Mockito.when(hdfsProperties.getBackendConfigProperties(Mockito.anyMap())).thenReturn(hdfsBaseProperties);
+
+        Map<Type, StorageProperties> storagePropertiesMap = new HashMap<>();
+        storagePropertiesMap.put(Type.S3, s3Properties);
+        storagePropertiesMap.put(Type.HDFS, hdfsProperties);
+
+        // Mock table with vended credentials
+        Table table = Mockito.mock(Table.class);
+        FileIO fileIO = Mockito.mock(FileIO.class);
+        Map<String, String> ioProperties = new HashMap<>();
+        ioProperties.put(S3FileIOProperties.ACCESS_KEY_ID, 
"vended-access-key");
+
+        Mockito.when(table.io()).thenReturn(fileIO);
+        Mockito.when(fileIO.properties()).thenReturn(ioProperties);
+
+        // Test with multiple storage types
+        Map<String, String> result = 
IcebergVendedCredentialsProvider.getBackendLocationProperties(
+                restProperties, storagePropertiesMap, table);
+
+        Assertions.assertEquals("s3-value", result.get("s3.property"));
+        Assertions.assertEquals("hdfs-value", result.get("hdfs.property"));
+        Assertions.assertEquals("vended-access-key", 
result.get("AWS_ACCESS_KEY"));
+    }
+
+    @Test
+    public void testGetBackendLocationPropertiesWithNullTable() {
+        // Mock IcebergRestProperties with vended credentials enabled
+        IcebergRestProperties restProperties = 
Mockito.mock(IcebergRestProperties.class);
+        
Mockito.when(restProperties.isIcebergRestVendedCredentialsEnabled()).thenReturn(true);
+
+        // Mock storage properties
+        StorageProperties storageProperties = 
Mockito.mock(StorageProperties.class);
+        Map<String, String> baseProperties = new HashMap<>();
+        baseProperties.put("base.property", "base-value");
+
+        
Mockito.when(storageProperties.getBackendConfigProperties()).thenReturn(baseProperties);
+
+        Map<Type, StorageProperties> storagePropertiesMap = new HashMap<>();
+        storagePropertiesMap.put(Type.S3, storageProperties);
+
+        // Test with null table (should fall back to base properties)
+        Map<String, String> result = 
IcebergVendedCredentialsProvider.getBackendLocationProperties(
+                restProperties, storagePropertiesMap, null);
+
+        Assertions.assertEquals("base-value", result.get("base.property"));
+        Assertions.assertFalse(result.containsKey("AWS_ACCESS_KEY"));
+    }
+
+    @Test
+    public void testGetBackendLocationPropertiesWithEmptyStorageMap() {
+        // Mock IcebergRestProperties
+        IcebergRestProperties restProperties = 
Mockito.mock(IcebergRestProperties.class);
+        
Mockito.when(restProperties.isIcebergRestVendedCredentialsEnabled()).thenReturn(true);
+
+        // Empty storage properties map
+        Map<Type, StorageProperties> storagePropertiesMap = new HashMap<>();
+
+        // Mock table
+        Table table = Mockito.mock(Table.class);
+
+        // Test with empty storage map
+        Map<String, String> result = 
IcebergVendedCredentialsProvider.getBackendLocationProperties(
+                restProperties, storagePropertiesMap, table);
+
+        Assertions.assertTrue(result.isEmpty());
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergRestPropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergRestPropertiesTest.java
new file mode 100644
index 00000000000..16aae778ada
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergRestPropertiesTest.java
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.rest.auth.OAuth2Properties;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class IcebergRestPropertiesTest {
+
+    @Test
+    public void testBasicRestProperties() {
+        Map<String, String> props = new HashMap<>();
+        props.put("iceberg.rest.uri", "http://localhost:8080";);
+        props.put("iceberg.rest.warehouse", "s3://warehouse/path");
+        props.put("iceberg.rest.prefix", "prefix");
+
+        IcebergRestProperties restProps = new IcebergRestProperties(props);
+        restProps.initNormalizeAndCheckProps();
+
+        Map<String, String> catalogProps = 
restProps.getIcebergRestCatalogProperties();
+        Assertions.assertEquals(CatalogUtil.ICEBERG_CATALOG_TYPE_REST,
+                catalogProps.get(CatalogUtil.ICEBERG_CATALOG_TYPE));
+        Assertions.assertEquals("http://localhost:8080";, 
catalogProps.get(CatalogProperties.URI));
+        Assertions.assertEquals("s3://warehouse/path", 
catalogProps.get(CatalogProperties.WAREHOUSE_LOCATION));
+        Assertions.assertEquals("prefix", catalogProps.get("prefix"));
+    }
+
+    @Test
+    public void testVendedCredentialsEnabled() {
+        Map<String, String> props = new HashMap<>();
+        props.put("iceberg.rest.uri", "http://localhost:8080";);
+        props.put("iceberg.rest.vended-credentials-enabled", "true");
+
+        IcebergRestProperties restProps = new IcebergRestProperties(props);
+        restProps.initNormalizeAndCheckProps();
+
+        
Assertions.assertTrue(restProps.isIcebergRestVendedCredentialsEnabled());
+        Map<String, String> catalogProps = 
restProps.getIcebergRestCatalogProperties();
+        Assertions.assertEquals("vended-credentials", 
catalogProps.get("header.X-Iceberg-Access-Delegation"));
+    }
+
+    @Test
+    public void testVendedCredentialsDisabled() {
+        Map<String, String> props = new HashMap<>();
+        props.put("iceberg.rest.uri", "http://localhost:8080";);
+        props.put("iceberg.rest.vended-credentials-enabled", "false");
+
+        IcebergRestProperties restProps = new IcebergRestProperties(props);
+        restProps.initNormalizeAndCheckProps();
+
+        
Assertions.assertFalse(restProps.isIcebergRestVendedCredentialsEnabled());
+        Map<String, String> catalogProps = 
restProps.getIcebergRestCatalogProperties();
+        
Assertions.assertFalse(catalogProps.containsKey("header.X-Iceberg-Access-Delegation"));
+    }
+
+    @Test
+    public void testOAuth2CredentialFlow() {
+        Map<String, String> props = new HashMap<>();
+        props.put("iceberg.rest.uri", "http://localhost:8080";);
+        props.put("iceberg.rest.security.type", "oauth2");
+        props.put("iceberg.rest.oauth2.credential", "client_credentials");
+        props.put("iceberg.rest.oauth2.server-uri", 
"http://auth.example.com/token";);
+        props.put("iceberg.rest.oauth2.scope", "read write");
+
+        IcebergRestProperties restProps = new IcebergRestProperties(props);
+        restProps.initNormalizeAndCheckProps();
+
+        Map<String, String> catalogProps = 
restProps.getIcebergRestCatalogProperties();
+        Assertions.assertEquals("client_credentials", 
catalogProps.get(OAuth2Properties.CREDENTIAL));
+        Assertions.assertEquals("http://auth.example.com/token";, 
catalogProps.get(OAuth2Properties.OAUTH2_SERVER_URI));
+        Assertions.assertEquals("read write", 
catalogProps.get(OAuth2Properties.SCOPE));
+        Assertions.assertEquals("false", 
catalogProps.get(OAuth2Properties.TOKEN_REFRESH_ENABLED));
+    }
+
+    @Test
+    public void testOAuth2TokenFlow() {
+        Map<String, String> props = new HashMap<>();
+        props.put("iceberg.rest.uri", "http://localhost:8080";);
+        props.put("iceberg.rest.security.type", "oauth2");
+        props.put("iceberg.rest.oauth2.token", "my-access-token");
+
+        IcebergRestProperties restProps = new IcebergRestProperties(props);
+        restProps.initNormalizeAndCheckProps();
+
+        Map<String, String> catalogProps = 
restProps.getIcebergRestCatalogProperties();
+        Assertions.assertEquals("my-access-token", 
catalogProps.get(OAuth2Properties.TOKEN));
+        
Assertions.assertFalse(catalogProps.containsKey(OAuth2Properties.CREDENTIAL));
+        
Assertions.assertFalse(catalogProps.containsKey(OAuth2Properties.OAUTH2_SERVER_URI));
+        
Assertions.assertFalse(catalogProps.containsKey(OAuth2Properties.SCOPE));
+    }
+
+    @Test
+    public void testOAuth2ValidationErrors() {
+        // Test: both credential and token provided
+        Map<String, String> props1 = new HashMap<>();
+        props1.put("iceberg.rest.uri", "http://localhost:8080";);
+        props1.put("iceberg.rest.security.type", "oauth2");
+        props1.put("iceberg.rest.oauth2.credential", "client_credentials");
+        props1.put("iceberg.rest.oauth2.token", "my-token");
+
+        IcebergRestProperties restProps1 = new IcebergRestProperties(props1);
+        Assertions.assertThrows(IllegalArgumentException.class, 
restProps1::initNormalizeAndCheckProps);
+
+        // Test: OAuth2 enabled but no credential or token
+        Map<String, String> props2 = new HashMap<>();
+        props2.put("iceberg.rest.uri", "http://localhost:8080";);
+        props2.put("iceberg.rest.security.type", "oauth2");
+
+        IcebergRestProperties restProps2 = new IcebergRestProperties(props2);
+        Assertions.assertThrows(IllegalArgumentException.class, 
restProps2::initNormalizeAndCheckProps);
+
+        // Test: credential flow without server URI
+        Map<String, String> props3 = new HashMap<>();
+        props3.put("iceberg.rest.uri", "http://localhost:8080";);
+        props3.put("iceberg.rest.security.type", "oauth2");
+        props3.put("iceberg.rest.oauth2.credential", "client_credentials");
+
+        IcebergRestProperties restProps3 = new IcebergRestProperties(props3);
+        Assertions.assertThrows(IllegalArgumentException.class, 
restProps3::initNormalizeAndCheckProps);
+
+        // Test: scope with token (should fail)
+        Map<String, String> props4 = new HashMap<>();
+        props4.put("iceberg.rest.uri", "http://localhost:8080";);
+        props4.put("iceberg.rest.security.type", "oauth2");
+        props4.put("iceberg.rest.oauth2.token", "my-token");
+        props4.put("iceberg.rest.oauth2.scope", "read");
+
+        IcebergRestProperties restProps4 = new IcebergRestProperties(props4);
+        Assertions.assertThrows(IllegalArgumentException.class, 
restProps4::initNormalizeAndCheckProps);
+    }
+
+    @Test
+    public void testInvalidSecurityType() {
+        Map<String, String> props = new HashMap<>();
+        props.put("iceberg.rest.uri", "http://localhost:8080";);
+        props.put("iceberg.rest.security.type", "invalid");
+
+        IcebergRestProperties restProps = new IcebergRestProperties(props);
+        Assertions.assertThrows(IllegalArgumentException.class, 
restProps::initNormalizeAndCheckProps);
+    }
+
+    @Test
+    public void testSecurityTypeNone() {
+        Map<String, String> props = new HashMap<>();
+        props.put("iceberg.rest.uri", "http://localhost:8080";);
+        props.put("iceberg.rest.security.type", "none");
+
+        IcebergRestProperties restProps = new IcebergRestProperties(props);
+        restProps.initNormalizeAndCheckProps();
+
+        Map<String, String> catalogProps = 
restProps.getIcebergRestCatalogProperties();
+        // Should only have basic properties, no OAuth2 properties
+        Assertions.assertEquals(CatalogUtil.ICEBERG_CATALOG_TYPE_REST,
+                catalogProps.get(CatalogUtil.ICEBERG_CATALOG_TYPE));
+        Assertions.assertEquals("http://localhost:8080";, 
catalogProps.get(CatalogProperties.URI));
+        
Assertions.assertFalse(catalogProps.containsKey(OAuth2Properties.CREDENTIAL));
+        
Assertions.assertFalse(catalogProps.containsKey(OAuth2Properties.TOKEN));
+    }
+
+    @Test
+    public void testUriAliases() {
+        // Test different URI property names
+        Map<String, String> props1 = new HashMap<>();
+        props1.put("uri", "http://localhost:8080";);
+        IcebergRestProperties restProps1 = new IcebergRestProperties(props1);
+        restProps1.initNormalizeAndCheckProps();
+        Assertions.assertEquals("http://localhost:8080";,
+                
restProps1.getIcebergRestCatalogProperties().get(CatalogProperties.URI));
+
+        Map<String, String> props2 = new HashMap<>();
+        props2.put("iceberg.rest.uri", "http://localhost:8080";);
+        IcebergRestProperties restProps2 = new IcebergRestProperties(props2);
+        restProps2.initNormalizeAndCheckProps();
+        Assertions.assertEquals("http://localhost:8080";,
+                
restProps2.getIcebergRestCatalogProperties().get(CatalogProperties.URI));
+    }
+
+    @Test
+    public void testWarehouseAliases() {
+        // Test different warehouse property names
+        Map<String, String> props1 = new HashMap<>();
+        props1.put("iceberg.rest.uri", "http://localhost:8080";);
+        props1.put("warehouse", "s3://warehouse/path");
+        IcebergRestProperties restProps1 = new IcebergRestProperties(props1);
+        restProps1.initNormalizeAndCheckProps();
+        Assertions.assertEquals("s3://warehouse/path",
+                
restProps1.getIcebergRestCatalogProperties().get(CatalogProperties.WAREHOUSE_LOCATION));
+
+        Map<String, String> props2 = new HashMap<>();
+        props2.put("iceberg.rest.uri", "http://localhost:8080";);
+        props2.put("iceberg.rest.warehouse", "s3://warehouse/path");
+        IcebergRestProperties restProps2 = new IcebergRestProperties(props2);
+        restProps2.initNormalizeAndCheckProps();
+        Assertions.assertEquals("s3://warehouse/path",
+                
restProps2.getIcebergRestCatalogProperties().get(CatalogProperties.WAREHOUSE_LOCATION));
+    }
+
+    @Test
+    public void testImmutablePropertiesMap() {
+        Map<String, String> props = new HashMap<>();
+        props.put("iceberg.rest.uri", "http://localhost:8080";);
+
+        IcebergRestProperties restProps = new IcebergRestProperties(props);
+        restProps.initNormalizeAndCheckProps();
+
+        Map<String, String> catalogProps = 
restProps.getIcebergRestCatalogProperties();
+
+        // Should throw UnsupportedOperationException when trying to modify
+        Assertions.assertThrows(UnsupportedOperationException.class, () -> {
+            catalogProps.put("test", "value");
+        });
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java
index b6d087542f4..d81f5148573 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java
@@ -60,11 +60,14 @@ public class S3PropertiesTest {
         origProps = new HashMap<>();
         origProps.put("s3.endpoint", "https://s3.example.com";);
         origProps.put(StorageProperties.FS_S3_SUPPORT, "true");
-        Assertions.assertThrows(IllegalArgumentException.class, () -> 
StorageProperties.createAll(origProps), "Property cos.access_key is required.");
+        Assertions.assertThrows(IllegalArgumentException.class, () -> 
StorageProperties.createAll(origProps),
+                "Property cos.access_key is required.");
         origProps.put("s3.access_key", "myS3AccessKey");
-        Assertions.assertThrows(IllegalArgumentException.class, () -> 
StorageProperties.createAll(origProps), "Property cos.secret_key is required.");
+        Assertions.assertThrows(IllegalArgumentException.class, () -> 
StorageProperties.createAll(origProps),
+                "Property cos.secret_key is required.");
         origProps.put("s3.secret_key", "myS3SecretKey");
-        Assertions.assertThrows(IllegalArgumentException.class, () -> 
StorageProperties.createAll(origProps), "Invalid endpoint format: 
https://s3.example.com";);
+        Assertions.assertThrows(IllegalArgumentException.class, () -> 
StorageProperties.createAll(origProps),
+                "Invalid endpoint format: https://s3.example.com";);
         origProps.put("s3.endpoint", "s3.us-west-1.amazonaws.com");
         Assertions.assertDoesNotThrow(() -> 
StorageProperties.createAll(origProps));
     }
@@ -179,7 +182,6 @@ public class S3PropertiesTest {
         Assertions.assertEquals("myCOSSecretKey", s3Properties.getSecretKey());
         Assertions.assertEquals("s3.us-west-2.amazonaws.com", 
s3Properties.getEndpoint());
 
-
     }
 
     @Test
@@ -199,8 +201,9 @@ public class S3PropertiesTest {
         s3EndpointProps.put("oss.secret_key", "myCOSSecretKey");
         s3EndpointProps.put("oss.region", "cn-hangzhou");
         origProps.put("uri", "s3://examplebucket-1250000000/test/file.txt");
-        //not support
-        Assertions.assertThrowsExactly(StoragePropertiesException.class, () -> 
StorageProperties.createPrimary(s3EndpointProps), "Property cos.endpoint is 
required.");
+        // not support
+        Assertions.assertThrowsExactly(StoragePropertiesException.class,
+                () -> StorageProperties.createPrimary(s3EndpointProps), 
"Property cos.endpoint is required.");
     }
 
     @Test
@@ -217,7 +220,8 @@ public class S3PropertiesTest {
     }
 
     @Test
-    public void testGetAwsCredentialsProviderWithIamRoleAndExternalId(@Mocked 
StsClientBuilder mockBuilder, @Mocked StsClient mockStsClient, @Mocked 
InstanceProfileCredentialsProvider mockInstanceCreds) {
+    public void testGetAwsCredentialsProviderWithIamRoleAndExternalId(@Mocked 
StsClientBuilder mockBuilder,
+            @Mocked StsClient mockStsClient, @Mocked 
InstanceProfileCredentialsProvider mockInstanceCreds) {
 
         new Expectations() {
             {
@@ -315,4 +319,68 @@ public class S3PropertiesTest {
         origProps.put("s3.endpoint", invalidEndpoint3);
         Assertions.assertThrows(IllegalArgumentException.class, () -> 
StorageProperties.createPrimary(origProps));
     }
+
+    @Test
+    public void testGetBackendConfigPropertiesWithRuntimeCredentials() throws 
UserException {
+        origProps.put("s3.endpoint", "s3.us-west-2.amazonaws.com");
+        origProps.put("s3.access_key", "base-access-key");
+        origProps.put("s3.secret_key", "base-secret-key");
+        origProps.put("s3.region", "us-west-2");
+
+        S3Properties s3Properties = (S3Properties) 
StorageProperties.createPrimary(origProps);
+
+        // Test with runtime credentials (vended credentials)
+        Map<String, String> runtimeCredentials = new HashMap<>();
+        runtimeCredentials.put("AWS_ACCESS_KEY", "vended-access-key");
+        runtimeCredentials.put("AWS_SECRET_KEY", "vended-secret-key");
+        runtimeCredentials.put("AWS_TOKEN", "vended-session-token");
+
+        Map<String, String> result = 
s3Properties.getBackendConfigProperties(runtimeCredentials);
+
+        // Runtime credentials should override base properties
+        Assertions.assertEquals("vended-access-key", 
result.get("AWS_ACCESS_KEY"));
+        Assertions.assertEquals("vended-secret-key", 
result.get("AWS_SECRET_KEY"));
+        Assertions.assertEquals("vended-session-token", 
result.get("AWS_TOKEN"));
+
+        // Base properties not overridden should still be present
+        Assertions.assertEquals("us-west-2", result.get("AWS_REGION"));
+        Assertions.assertEquals("s3.us-west-2.amazonaws.com", 
result.get("AWS_ENDPOINT"));
+    }
+
+    @Test
+    public void testGetBackendConfigPropertiesWithNullRuntimeCredentials() 
throws UserException {
+        origProps.put("s3.endpoint", "s3.us-west-2.amazonaws.com");
+        origProps.put("s3.access_key", "base-access-key");
+        origProps.put("s3.secret_key", "base-secret-key");
+        origProps.put("s3.region", "us-west-2");
+
+        S3Properties s3Properties = (S3Properties) 
StorageProperties.createPrimary(origProps);
+
+        Map<String, String> result = 
s3Properties.getBackendConfigProperties(null);
+        Map<String, String> baseResult = 
s3Properties.getBackendConfigProperties();
+
+        // Should be identical to base properties when runtime credentials are 
null
+        Assertions.assertEquals(baseResult.size(), result.size());
+        Assertions.assertEquals("base-access-key", 
result.get("AWS_ACCESS_KEY"));
+        Assertions.assertEquals("base-secret-key", 
result.get("AWS_SECRET_KEY"));
+        Assertions.assertEquals("us-west-2", result.get("AWS_REGION"));
+    }
+
+    @Test
+    public void testGetBackendConfigPropertiesWithEmptyRuntimeCredentials() 
throws UserException {
+        origProps.put("s3.endpoint", "s3.us-west-2.amazonaws.com");
+        origProps.put("s3.access_key", "base-access-key");
+        origProps.put("s3.secret_key", "base-secret-key");
+
+        S3Properties s3Properties = (S3Properties) 
StorageProperties.createPrimary(origProps);
+
+        Map<String, String> emptyCredentials = new HashMap<>();
+        Map<String, String> result = 
s3Properties.getBackendConfigProperties(emptyCredentials);
+        Map<String, String> baseResult = 
s3Properties.getBackendConfigProperties();
+
+        // Should be identical to base properties when runtime credentials are 
empty
+        Assertions.assertEquals(baseResult.size(), result.size());
+        Assertions.assertEquals("base-access-key", 
result.get("AWS_ACCESS_KEY"));
+        Assertions.assertEquals("base-secret-key", 
result.get("AWS_SECRET_KEY"));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to