This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ffbacbcdc64 [Feat](params-refactor): Refactor Paimon Metastore 
parameter integration (#54114)
ffbacbcdc64 is described below

commit ffbacbcdc64b01b38292a7a26c2a182eaf517863
Author: Calvin Kirs <[email protected]>
AuthorDate: Sun Aug 3 00:50:05 2025 +0800

    [Feat](params-refactor): Refactor Paimon Metastore parameter integration 
(#54114)
    
    ### What problem does this PR solve?
    
    #50238
    
    
[](https://github.com/apache/doris/commit/cba9b29bd0f9e3e18b90f18f6bab9e435ea90d50)
    
    This PR introduces a refactored parameter integration framework for
    Apache Paimon metastore catalogs. The goal is to standardize parameter
    injection, improving configuration consistency and ease of use.
    
    Key Changes
    Introduced an abstract class AbstractPaimonProperties to provide a
    unified parameter injection workflow and extension point.
    
    The new parameter system supports multiple metastore types, including:
    
    HMS (Hive Metastore)
    
    Aliyun DLF (an HMS-based adaptation)
    
    FileSystem (local or HDFS)
    
    All configuration parameters are constructed through a standardized
    Options object, eliminating the need for ad-hoc parameter mapping or
    conversions.
    
    Subclasses inject metastore-specific parameters (e.g.,
    hive.metastore.uris, DLF credentials) by implementing the
    ↳appendCustomCatalogOptions method.
---
 .../java/org/apache/doris/catalog/Resource.java    |  16 +-
 .../org/apache/doris/datasource/CatalogIf.java     |   7 +-
 .../org/apache/doris/datasource/CatalogLog.java    |   1 +
 .../org/apache/doris/datasource/CatalogMgr.java    |  17 --
 .../apache/doris/datasource/CatalogProperty.java   | 173 +++++++++++--------
 .../apache/doris/datasource/ExternalCatalog.java   |   5 -
 .../doris/datasource/hive/HMSExternalTable.java    |  10 +-
 .../doris/datasource/hive/source/HiveScanNode.java |   4 +-
 .../doris/datasource/hudi/source/HudiScanNode.java |   4 +-
 .../datasource/iceberg/IcebergExternalCatalog.java |   9 -
 .../doris/datasource/jdbc/JdbcExternalCatalog.java |   2 +-
 .../paimon/PaimonDLFExternalCatalog.java           |  26 ---
 .../datasource/paimon/PaimonExternalCatalog.java   | 154 ++++++-----------
 .../paimon/PaimonExternalCatalogFactory.java       |   6 +-
 .../paimon/PaimonFileExternalCatalog.java          |  65 -------
 .../paimon/PaimonHMSExternalCatalog.java           |  36 ----
 .../datasource/paimon/source/PaimonScanNode.java   |  16 +-
 .../metastore/AbstractPaimonProperties.java        | 188 +++++++++++++++++++++
 .../property/metastore/MetastoreProperties.java    |   2 +
 .../PaimonAliyunDLFMetaStoreProperties.java        | 123 ++++++++++++++
 .../PaimonFileSystemMetaStoreProperties.java       |  76 +++++++++
 .../metastore/PaimonHMSMetaStoreProperties.java    | 117 +++++++++++++
 .../metastore/PaimonPropertiesFactory.java         |  37 ++++
 .../storage/AbstractS3CompatibleProperties.java    |  20 ++-
 .../datasource/property/storage/COSProperties.java |  16 +-
 .../property/storage/MinioProperties.java          |  15 --
 .../datasource/property/storage/OBSProperties.java |  11 +-
 .../datasource/property/storage/OSSProperties.java |  13 +-
 .../datasource/property/storage/S3Properties.java  |  53 ++----
 .../trees/plans/logical/LogicalHudiScan.java       |   2 +-
 .../org/apache/doris/planner/HiveTableSink.java    |   2 +-
 31 files changed, 751 insertions(+), 475 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
index eb5c41808b6..6250d92ab0d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
@@ -24,7 +24,6 @@ import org.apache.doris.common.io.DeepCopy;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.proc.BaseProcResult;
-import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.nereids.trees.plans.commands.CreateResourceCommand;
 import org.apache.doris.nereids.trees.plans.commands.info.CreateResourceInfo;
 import org.apache.doris.persist.gson.GsonPostProcessable;
@@ -292,20 +291,7 @@ public abstract class Resource implements Writable, 
GsonPostProcessable {
     private void notifyUpdate(Map<String, String> properties) {
         
references.entrySet().stream().collect(Collectors.groupingBy(Entry::getValue)).forEach((type,
 refs) -> {
             if (type == ReferenceType.CATALOG) {
-                for (Map.Entry<String, ReferenceType> ref : refs) {
-                    String catalogName = 
ref.getKey().split(REFERENCE_SPLIT)[0];
-                    CatalogIf catalog = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName);
-                    if (catalog == null) {
-                        LOG.warn("Can't find the reference catalog {} for 
resource {}", catalogName, name);
-                        continue;
-                    }
-                    if (!name.equals(catalog.getResource())) {
-                        LOG.warn("Failed to update catalog {} for different 
resource "
-                                + "names(resource={}, catalog.resource={})", 
catalogName, name, catalog.getResource());
-                        continue;
-                    }
-                    catalog.notifyPropertiesUpdated(properties);
-                }
+                // No longer support resource in Catalog.
             }
         });
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java
index c888e6e6541..df908cdc2c7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java
@@ -36,7 +36,6 @@ import 
org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceTagInfo
 import org.apache.doris.nereids.trees.plans.commands.info.DropBranchInfo;
 import org.apache.doris.nereids.trees.plans.commands.info.DropTagInfo;
 
-import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -89,10 +88,6 @@ public interface CatalogIf<T extends DatabaseIf> {
 
     Map<String, String> getProperties();
 
-    default String getResource() {
-        return null;
-    }
-
     default void notifyPropertiesUpdated(Map<String, String> updatedProps) {
         if (this instanceof ExternalCatalog) {
             ((ExternalCatalog) this).resetToUninitialized(false);
@@ -175,7 +170,7 @@ public interface CatalogIf<T extends DatabaseIf> {
         CatalogLog log = new CatalogLog();
         log.setCatalogId(getId());
         log.setCatalogName(getName());
-        log.setResource(Strings.nullToEmpty(getResource()));
+        log.setResource("");
         log.setComment(getComment());
         log.setProps(getProperties());
         return log;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogLog.java
index d862a3ef44c..08a08d49d84 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogLog.java
@@ -56,6 +56,7 @@ public class CatalogLog implements Writable {
     @SerializedName(value = "invalidCache")
     private boolean invalidCache;
 
+    @Deprecated
     @SerializedName(value = "resource")
     private String resource;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
index bf4d7e4b5e1..f4b2d4cc816 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
@@ -23,8 +23,6 @@ import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.EnvFactory;
-import org.apache.doris.catalog.Resource;
-import org.apache.doris.catalog.Resource.ReferenceType;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
@@ -121,12 +119,6 @@ public class CatalogMgr implements Writable, 
GsonPostProcessable {
         if (!catalogName.equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
             ((ExternalCatalog) catalog).resetToUninitialized(false);
         }
-        if (!Strings.isNullOrEmpty(catalog.getResource())) {
-            Resource resource = 
Env.getCurrentEnv().getResourceMgr().getResource(catalog.getResource());
-            if (resource != null) {
-                resource.addReference(catalog.getName(), 
ReferenceType.CATALOG);
-            }
-        }
     }
 
     private CatalogIf removeCatalog(long catalogId) {
@@ -139,12 +131,6 @@ public class CatalogMgr implements Writable, 
GsonPostProcessable {
                 ConnectContext.get().removeLastDBOfCatalog(catalog.getName());
             }
             
Env.getCurrentEnv().getExtMetaCacheMgr().removeCache(catalog.getId());
-            if (!Strings.isNullOrEmpty(catalog.getResource())) {
-                Resource catalogResource = 
Env.getCurrentEnv().getResourceMgr().getResource(catalog.getResource());
-                if (catalogResource != null) {
-                    catalogResource.removeReference(catalog.getName(), 
ReferenceType.CATALOG);
-                }
-            }
             Env.getCurrentEnv().getQueryStats().clear(catalog.getId());
         }
         return catalog;
@@ -436,9 +422,6 @@ public class CatalogMgr implements Writable, 
GsonPostProcessable {
                             ConnectContext.get().getQualifiedUser(),
                             catalog.getName());
                 }
-                if (!Strings.isNullOrEmpty(catalog.getResource())) {
-                    rows.add(Arrays.asList("resource", catalog.getResource()));
-                }
                 Map<String, String> sortedMap = 
getCatalogPropertiesWithPrintable(catalog);
                 sortedMap.forEach((k, v) -> rows.add(Arrays.asList(k, v)));
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java
index ebb7c6f292d..da598e982e6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java
@@ -17,14 +17,11 @@
 
 package org.apache.doris.datasource;
 
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.Resource;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.property.PropertyConverter;
 import org.apache.doris.datasource.property.metastore.MetastoreProperties;
 import org.apache.doris.datasource.property.storage.StorageProperties;
 
-import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
 import com.google.gson.annotations.SerializedName;
 import org.apache.commons.collections.MapUtils;
@@ -35,6 +32,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * CatalogProperty to store the properties for catalog.
@@ -43,127 +41,160 @@ import java.util.function.Function;
 public class CatalogProperty {
     private static final Logger LOG = 
LogManager.getLogger(CatalogProperty.class);
 
+    @Deprecated
     @SerializedName(value = "resource")
     private String resource;
+
     @SerializedName(value = "properties")
     private Map<String, String> properties;
 
+    // Lazy-loaded storage properties map, using volatile to ensure visibility
     private volatile Map<StorageProperties.Type, StorageProperties> 
storagePropertiesMap;
 
-    private MetastoreProperties metastoreProperties;
+    // Lazy-loaded metastore properties, using volatile to ensure visibility
+    private volatile MetastoreProperties metastoreProperties;
+
+    // Lazy-loaded backend storage properties, using volatile to ensure 
visibility
+    private volatile Map<String, String> backendStorageProperties;
 
-    private volatile Resource catalogResource = null;
+    // Lazy-loaded Hadoop properties, using volatile to ensure visibility
+    private volatile Map<String, String> hadoopProperties;
 
     public CatalogProperty(String resource, Map<String, String> properties) {
-        this.resource = Strings.nullToEmpty(resource);
+        this.resource = resource; // Keep but not used
         this.properties = properties;
         if (this.properties == null) {
             this.properties = Maps.newConcurrentMap();
         }
     }
 
-    private Resource catalogResource() {
-        if (!Strings.isNullOrEmpty(resource) && catalogResource == null) {
-            synchronized (this) {
-                if (catalogResource == null) {
-                    catalogResource = 
Env.getCurrentEnv().getResourceMgr().getResource(resource);
-                }
-            }
-        }
-        return catalogResource;
-    }
-
     public String getOrDefault(String key, String defaultVal) {
-        String val = properties.get(key);
-        if (val == null) {
-            Resource res = catalogResource();
-            if (res != null) {
-                val = res.getCopiedProperties().getOrDefault(key, defaultVal);
-            } else {
-                val = defaultVal;
-            }
-        }
-        return val;
+        return properties.getOrDefault(key, defaultVal);
     }
 
     public Map<String, String> getProperties() {
-        Map<String, String> mergedProperties = Maps.newHashMap();
-        if (!Strings.isNullOrEmpty(resource)) {
-            Resource res = catalogResource();
-            if (res != null) {
-                mergedProperties = res.getCopiedProperties();
-            }
-        }
-        mergedProperties.putAll(properties);
-        return mergedProperties;
-    }
-
-    public String getResource() {
-        return resource;
+        return Maps.newHashMap(properties);
     }
 
     public void modifyCatalogProps(Map<String, String> props) {
-        properties.putAll(PropertyConverter.convertToMetaProperties(props));
-        this.storagePropertiesMap = null;
-    }
-
-    private void reInitCatalogStorageProperties() {
-        List<StorageProperties> storageProperties;
-        try {
-            storageProperties = StorageProperties.createAll(getProperties());
-            this.storagePropertiesMap = (storageProperties.stream()
-                    
.collect(java.util.stream.Collectors.toMap(StorageProperties::getType, 
Function.identity())));
-        } catch (UserException e) {
-            throw new RuntimeException(e);
+        synchronized (this) {
+            
properties.putAll(PropertyConverter.convertToMetaProperties(props));
+            resetAllCaches();
         }
-
     }
 
     public void rollBackCatalogProps(Map<String, String> props) {
-        properties.clear();
-        properties = new HashMap<>(props);
-        this.storagePropertiesMap = null;
-    }
-
-
-    public Map<String, String> getHadoopProperties() {
-        Map<String, String> hadoopProperties = getProperties();
-        
hadoopProperties.putAll(PropertyConverter.convertToHadoopFSProperties(getProperties()));
-        return hadoopProperties;
+        synchronized (this) {
+            properties = new HashMap<>(props);
+            resetAllCaches();
+        }
     }
 
     public void addProperty(String key, String val) {
-        this.properties.put(key, val);
-        this.storagePropertiesMap = null; // reset storage properties map
+        synchronized (this) {
+            this.properties.put(key, val);
+            resetAllCaches();
+        }
     }
 
     public void deleteProperty(String key) {
-        this.properties.remove(key);
+        synchronized (this) {
+            this.properties.remove(key);
+            resetAllCaches();
+        }
+    }
+
+    /**
+     * Unified cache reset method to ensure all caches are properly cleared
+     */
+    private void resetAllCaches() {
         this.storagePropertiesMap = null;
+        this.metastoreProperties = null;
+        this.backendStorageProperties = null;
+        this.hadoopProperties = null;
     }
 
+    /**
+     * Get storage properties map with lazy loading, using double-check 
locking to ensure thread safety
+     */
     public Map<StorageProperties.Type, StorageProperties> 
getStoragePropertiesMap() {
         if (storagePropertiesMap == null) {
             synchronized (this) {
                 if (storagePropertiesMap == null) {
-                    reInitCatalogStorageProperties();
+                    try {
+                        List<StorageProperties> storageProperties = 
StorageProperties.createAll(getProperties());
+                        this.storagePropertiesMap = storageProperties.stream()
+                                
.collect(Collectors.toMap(StorageProperties::getType, Function.identity()));
+                    } catch (UserException e) {
+                        LOG.warn("Failed to initialize catalog storage 
properties", e);
+                        throw new RuntimeException("Failed to initialize 
storage properties for catalog", e);
+                    }
                 }
             }
         }
         return storagePropertiesMap;
     }
 
+    /**
+     * Get metastore properties with lazy loading, using double-check locking 
to ensure thread safety
+     */
     public MetastoreProperties getMetastoreProperties() {
         if (MapUtils.isEmpty(getProperties())) {
             return null;
         }
+
         if (metastoreProperties == null) {
-            try {
-                metastoreProperties = 
MetastoreProperties.create(getProperties());
-            } catch (UserException e) {
-                throw new RuntimeException(e);
+            synchronized (this) {
+                if (metastoreProperties == null) {
+                    try {
+                        metastoreProperties = 
MetastoreProperties.create(getProperties());
+                    } catch (UserException e) {
+                        LOG.warn("Failed to create metastore properties", e);
+                        throw new RuntimeException("Failed to create metastore 
properties", e);
+                    }
+                }
             }
         }
         return metastoreProperties;
     }
+
+    /**
+     * Get backend storage properties with lazy loading, using double-check 
locking to ensure thread safety
+     */
+    public Map<String, String> getBackendStorageProperties() {
+        if (backendStorageProperties == null) {
+            synchronized (this) {
+                if (backendStorageProperties == null) {
+                    Map<String, String> result = new HashMap<>();
+                    Map<StorageProperties.Type, StorageProperties> storageMap 
= getStoragePropertiesMap();
+
+                    for (StorageProperties sp : storageMap.values()) {
+                        Map<String, String> backendProps = 
sp.getBackendConfigProperties();
+                        if (backendProps != null) {
+                            result.putAll(backendProps);
+                        }
+                    }
+
+                    this.backendStorageProperties = result;
+                }
+            }
+        }
+        return backendStorageProperties;
+    }
+
+    /**
+     * Get Hadoop properties with lazy loading, using double-check locking to 
ensure thread safety
+     */
+    public Map<String, String> getHadoopProperties() {
+        if (hadoopProperties == null) {
+            synchronized (this) {
+                if (hadoopProperties == null) {
+                    Map<String, String> result = getProperties();
+                    
result.putAll(PropertyConverter.convertToHadoopFSProperties(getProperties()));
+                    this.hadoopProperties = result;
+                }
+            }
+        }
+        return hadoopProperties;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 059226b5f5e..e590b221e37 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -675,11 +675,6 @@ public abstract class ExternalCatalog
                 + ", table id: " + tableId);
     }
 
-    @Override
-    public String getResource() {
-        return catalogProperty.getResource();
-    }
-
     @Nullable
     @Override
     public ExternalDatabase<? extends ExternalTable> getDbNullable(String 
dbName) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index 5e54673d359..a156184e67c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -601,14 +601,8 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
         return catalog.getCatalogProperty().getStoragePropertiesMap();
     }
 
-    public Map<String, String> getHadoopProperties() {
-        return getStoragePropertiesMap().values().stream()
-                .flatMap(m -> 
m.getBackendConfigProperties().entrySet().stream())
-                .collect(Collectors.toMap(
-                        Map.Entry::getKey,
-                        Map.Entry::getValue,
-                        (v1, v2) -> v2));
-
+    public Map<String, String> getBackendStorageProperties() {
+        return catalog.getCatalogProperty().getBackendStorageProperties();
     }
 
     public List<ColumnStatisticsObj> getHiveTableColumnStats(List<String> 
columns) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index 2a0953f052e..bf714c6d89e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -477,8 +477,8 @@ public class HiveScanNode extends FileQueryScanNode {
     }
 
     @Override
-    protected Map<String, String> getLocationProperties() throws UserException 
 {
-        return hmsTable.getHadoopProperties();
+    protected Map<String, String> getLocationProperties() {
+        return hmsTable.getBackendStorageProperties();
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index bb1ca6a25fc..0e17e49f3ae 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -231,12 +231,12 @@ public class HudiScanNode extends HiveScanNode {
     }
 
     @Override
-    protected Map<String, String> getLocationProperties() throws UserException 
{
+    protected Map<String, String> getLocationProperties() {
         if (incrementalRead) {
             return incrementalRelation.getHoodieParams();
         } else {
             // HudiJniScanner uses hadoop client to read data.
-            return hmsTable.getHadoopProperties();
+            return hmsTable.getBackendStorageProperties();
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index 2d87889d50e..b1178363b9b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -23,18 +23,14 @@ import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.InitCatalogLog;
 import org.apache.doris.datasource.SessionContext;
 import org.apache.doris.datasource.operations.ExternalMetadataOperations;
-import org.apache.doris.datasource.property.PropertyConverter;
 import 
org.apache.doris.datasource.property.metastore.AbstractIcebergProperties;
 import org.apache.doris.datasource.property.metastore.MetastoreProperties;
 import org.apache.doris.transaction.TransactionManagerFactory;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.iceberg.catalog.Catalog;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 public abstract class IcebergExternalCatalog extends ExternalCatalog {
 
@@ -142,11 +138,6 @@ public abstract class IcebergExternalCatalog extends 
ExternalCatalog {
         }
     }
 
-    protected void initS3Param(Configuration conf) {
-        Map<String, String> properties = catalogProperty.getHadoopProperties();
-        conf.set(Constants.AWS_CREDENTIALS_PROVIDER, 
PropertyConverter.getAWSCredentialsProviders(properties));
-    }
-
     @Override
     public boolean viewExists(String dbName, String viewName) {
         return metadataOps.viewExists(dbName, viewName);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
index 85e3b29e1a3..e8e92e60ac1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
@@ -361,7 +361,7 @@ public class JdbcExternalCatalog extends ExternalCatalog {
         jdbcTable.setDriverClass(this.getDriverClass());
         jdbcTable.setDriverUrl(this.getDriverUrl());
         jdbcTable.setCheckSum(this.getCheckSum());
-        jdbcTable.setResourceName(this.getResource());
+        jdbcTable.setResourceName("");
         jdbcTable.setConnectionPoolMinSize(this.getConnectionPoolMinSize());
         jdbcTable.setConnectionPoolMaxSize(this.getConnectionPoolMaxSize());
         
jdbcTable.setConnectionPoolMaxLifeTime(this.getConnectionPoolMaxLifeTime());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonDLFExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonDLFExternalCatalog.java
index 00363c1f799..fba59545922 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonDLFExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonDLFExternalCatalog.java
@@ -17,38 +17,12 @@
 
 package org.apache.doris.datasource.paimon;
 
-import org.apache.doris.datasource.property.constants.PaimonProperties;
-
-import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import java.util.Map;
 
 public class PaimonDLFExternalCatalog extends PaimonExternalCatalog {
-    private static final Logger LOG = 
LogManager.getLogger(PaimonDLFExternalCatalog.class);
 
     public PaimonDLFExternalCatalog(long catalogId, String name, String 
resource,
                                     Map<String, String> props, String comment) 
{
         super(catalogId, name, resource, props, comment);
     }
-
-    @Override
-    protected void initLocalObjectsImpl() {
-        super.initLocalObjectsImpl();
-        catalogType = PAIMON_DLF;
-        catalog = createCatalog();
-    }
-
-    @Override
-    protected void setPaimonCatalogOptions(Map<String, String> properties, 
Map<String, String> options) {
-        options.put(PaimonProperties.PAIMON_CATALOG_TYPE, 
PaimonProperties.PAIMON_HMS_CATALOG);
-        options.put(PaimonProperties.PAIMON_METASTORE_CLIENT, 
ProxyMetaStoreClient.class.getName());
-        options.put(PaimonProperties.PAIMON_OSS_ENDPOINT,
-                properties.get(PaimonProperties.PAIMON_OSS_ENDPOINT));
-        options.put(PaimonProperties.PAIMON_OSS_ACCESS_KEY,
-                properties.get(PaimonProperties.PAIMON_OSS_ACCESS_KEY));
-        options.put(PaimonProperties.PAIMON_OSS_SECRET_KEY,
-                properties.get(PaimonProperties.PAIMON_OSS_SECRET_KEY));
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
index 93252d61d58..b8decdc032d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
@@ -18,38 +18,28 @@
 package org.apache.doris.datasource.paimon;
 
 import org.apache.doris.common.DdlException;
-import org.apache.doris.common.security.authentication.AuthenticationConfig;
-import org.apache.doris.common.security.authentication.HadoopAuthenticator;
-import 
org.apache.doris.common.security.authentication.HadoopExecutionAuthenticator;
+import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.CatalogProperty;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.InitCatalogLog;
 import org.apache.doris.datasource.NameMapping;
 import org.apache.doris.datasource.SessionContext;
-import org.apache.doris.datasource.property.PropertyConverter;
-import org.apache.doris.datasource.property.constants.HMSProperties;
-import org.apache.doris.datasource.property.constants.PaimonProperties;
-import org.apache.doris.fs.remote.dfs.DFSFileSystem;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.doris.datasource.property.metastore.AbstractPaimonProperties;
+import org.apache.doris.datasource.property.metastore.MetastoreProperties;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Catalog.TableNotExistException;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.Partition;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-public abstract class PaimonExternalCatalog extends ExternalCatalog {
+public class PaimonExternalCatalog extends ExternalCatalog {
     private static final Logger LOG = 
LogManager.getLogger(PaimonExternalCatalog.class);
     public static final String PAIMON_CATALOG_TYPE = "paimon.catalog.type";
     public static final String PAIMON_FILESYSTEM = "filesystem";
@@ -57,35 +47,32 @@ public abstract class PaimonExternalCatalog extends 
ExternalCatalog {
     public static final String PAIMON_DLF = "dlf";
     protected String catalogType;
     protected Catalog catalog;
-    protected AuthenticationConfig authConf;
-    protected HadoopAuthenticator hadoopAuthenticator;
 
-    private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
-            PaimonProperties.WAREHOUSE
-    );
+    private AbstractPaimonProperties paimonProperties;
 
-    public PaimonExternalCatalog(long catalogId, String name, String resource,
-                                 Map<String, String> props, String comment) {
+    public PaimonExternalCatalog(long catalogId, String name, String resource, 
Map<String, String> props,
+                                 String comment) {
         super(catalogId, name, InitCatalogLog.Type.PAIMON, comment);
-        props = PropertyConverter.convertToMetaProperties(props);
         catalogProperty = new CatalogProperty(resource, props);
     }
 
     @Override
     protected void initLocalObjectsImpl() {
-        Configuration conf = 
DFSFileSystem.getHdfsConf(ifNotSetFallbackToSimpleAuth());
-        for (Map.Entry<String, String> propEntry : 
this.catalogProperty.getHadoopProperties().entrySet()) {
-            conf.set(propEntry.getKey(), propEntry.getValue());
+        try {
+            paimonProperties = (AbstractPaimonProperties) 
MetastoreProperties.create(catalogProperty.getProperties());
+        } catch (UserException e) {
+            throw new IllegalArgumentException("Failed to create Paimon 
properties from catalog properties,exception: "
+                    + ExceptionUtils.getRootCauseMessage(e), e);
         }
-        authConf = AuthenticationConfig.getKerberosConfig(conf);
-        hadoopAuthenticator = 
HadoopAuthenticator.getHadoopAuthenticator(authConf);
+        catalogType = paimonProperties.getPaimonCatalogType();
+        catalog = createCatalog();
         initPreExecutionAuthenticator();
     }
 
     @Override
     protected synchronized void initPreExecutionAuthenticator() {
         if (executionAuthenticator == null) {
-            executionAuthenticator = new 
HadoopExecutionAuthenticator(hadoopAuthenticator);
+            executionAuthenticator = 
paimonProperties.getExecutionAuthenticator();
         }
     }
 
@@ -96,8 +83,8 @@ public abstract class PaimonExternalCatalog extends 
ExternalCatalog {
 
     protected List<String> listDatabaseNames() {
         try {
-            return hadoopAuthenticator.doAs(() -> new 
ArrayList<>(catalog.listDatabases()));
-        } catch (IOException e) {
+            return executionAuthenticator.execute(() -> new 
ArrayList<>(catalog.listDatabases()));
+        } catch (Exception e) {
             throw new RuntimeException("Failed to list databases names, 
catalog name: " + getName(), e);
         }
     }
@@ -106,7 +93,7 @@ public abstract class PaimonExternalCatalog extends 
ExternalCatalog {
     public boolean tableExist(SessionContext ctx, String dbName, String 
tblName) {
         makeSureInitialized();
         try {
-            return hadoopAuthenticator.doAs(() -> {
+            return executionAuthenticator.execute(() -> {
                 try {
                     catalog.getTable(Identifier.create(dbName, tblName));
                     return true;
@@ -115,8 +102,9 @@ public abstract class PaimonExternalCatalog extends 
ExternalCatalog {
                 }
             });
 
-        } catch (IOException e) {
-            throw new RuntimeException("Failed to check table existence, 
catalog name: " + getName(), e);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to check table existence, 
catalog name: " + getName()
+                    + "error message is:" + 
ExceptionUtils.getRootCauseMessage(e), e);
         }
     }
 
@@ -124,7 +112,7 @@ public abstract class PaimonExternalCatalog extends 
ExternalCatalog {
     public List<String> listTableNames(SessionContext ctx, String dbName) {
         makeSureInitialized();
         try {
-            return hadoopAuthenticator.doAs(() -> {
+            return executionAuthenticator.execute(() -> {
                 List<String> tableNames = null;
                 try {
                     tableNames = catalog.listTables(dbName);
@@ -133,7 +121,7 @@ public abstract class PaimonExternalCatalog extends 
ExternalCatalog {
                 }
                 return tableNames;
             });
-        } catch (IOException e) {
+        } catch (Exception e) {
             throw new RuntimeException("Failed to list table names, catalog 
name: " + getName(), e);
         }
     }
@@ -141,32 +129,31 @@ public abstract class PaimonExternalCatalog extends 
ExternalCatalog {
     public org.apache.paimon.table.Table getPaimonTable(NameMapping 
nameMapping) {
         makeSureInitialized();
         try {
-            return hadoopAuthenticator.doAs(() -> catalog.getTable(
-                    Identifier.create(nameMapping.getRemoteDbName(), 
nameMapping.getRemoteTblName())));
+            return executionAuthenticator.execute(() -> 
catalog.getTable(Identifier.create(nameMapping
+                    .getRemoteDbName(), nameMapping.getRemoteTblName())));
         } catch (Exception e) {
-            throw new RuntimeException("Failed to get Paimon table:" + 
getName() + "."
-                    + nameMapping.getLocalDbName() + "." + 
nameMapping.getLocalTblName() + ", because "
-                    + e.getMessage(), e);
+            throw new RuntimeException("Failed to get Paimon table:" + 
getName() + "." + nameMapping.getLocalDbName()
+                    + "." + nameMapping.getLocalTblName() + ", because " + 
ExceptionUtils.getRootCauseMessage(e), e);
         }
     }
 
     public List<Partition> getPaimonPartitions(NameMapping nameMapping) {
         makeSureInitialized();
         try {
-            return hadoopAuthenticator.doAs(() -> {
+            return executionAuthenticator.execute(() -> {
                 List<Partition> partitions = new ArrayList<>();
                 try {
-                    partitions = catalog.listPartitions(
-                            Identifier.create(nameMapping.getRemoteDbName(), 
nameMapping.getRemoteTblName()));
+                    partitions = 
catalog.listPartitions(Identifier.create(nameMapping.getRemoteDbName(),
+                            nameMapping.getRemoteTblName()));
                 } catch (Catalog.TableNotExistException e) {
                     LOG.warn("TableNotExistException", e);
                 }
                 return partitions;
             });
-        } catch (IOException e) {
+        } catch (Exception e) {
             throw new RuntimeException("Failed to get Paimon table 
partitions:" + getName() + "."
-                    + nameMapping.getRemoteDbName() + "." + 
nameMapping.getRemoteTblName()
-                    + ", because " + e.getMessage(), e);
+                    + nameMapping.getRemoteDbName() + "." + 
nameMapping.getRemoteTblName() + ", because "
+                    + ExceptionUtils.getRootCauseMessage(e), e);
         }
     }
 
@@ -174,79 +161,44 @@ public abstract class PaimonExternalCatalog extends 
ExternalCatalog {
         return getPaimonSystemTable(nameMapping, null, queryType);
     }
 
-    public org.apache.paimon.table.Table getPaimonSystemTable(NameMapping 
nameMapping, String branch,
-            String queryType) {
+    public org.apache.paimon.table.Table getPaimonSystemTable(NameMapping 
nameMapping,
+                                                              String branch, 
String queryType) {
         makeSureInitialized();
         try {
-            return hadoopAuthenticator.doAs(() -> catalog.getTable(new 
Identifier(nameMapping.getRemoteDbName(),
+            return executionAuthenticator.execute(() -> catalog.getTable(new 
Identifier(nameMapping.getRemoteDbName(),
                     nameMapping.getRemoteTblName(), branch, queryType)));
         } catch (Exception e) {
             throw new RuntimeException("Failed to get Paimon system table:" + 
getName() + "."
                     + nameMapping.getRemoteDbName() + "." + 
nameMapping.getRemoteTblName() + "$" + queryType
-                    + ", because " + e.getMessage(), e);
+                    + ", because " + ExceptionUtils.getRootCauseMessage(e), e);
         }
     }
 
-    protected String getPaimonCatalogType(String catalogType) {
-        if (PAIMON_HMS.equalsIgnoreCase(catalogType)) {
-            return PaimonProperties.PAIMON_HMS_CATALOG;
-        } else {
-            return PaimonProperties.PAIMON_FILESYSTEM_CATALOG;
-        }
-    }
 
     protected Catalog createCatalog() {
         try {
-            return hadoopAuthenticator.doAs(() -> {
-                Options options = new Options();
-                Map<String, String> paimonOptionsMap = getPaimonOptionsMap();
-                for (Map.Entry<String, String> kv : 
paimonOptionsMap.entrySet()) {
-                    options.set(kv.getKey(), kv.getValue());
-                }
-                CatalogContext context = CatalogContext.create(options, 
getConfiguration());
-                return createCatalogImpl(context);
-            });
-        } catch (IOException e) {
-            throw new RuntimeException("Failed to create catalog, catalog 
name: " + getName(), e);
+            return paimonProperties.initializeCatalog(getName(), new 
ArrayList<>(catalogProperty
+                    .getStoragePropertiesMap().values()));
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to create catalog, catalog 
name: " + getName() + ", exception: "
+                    + ExceptionUtils.getRootCauseMessage(e), e);
         }
     }
 
-    protected Catalog createCatalogImpl(CatalogContext context) {
-        return CatalogFactory.createCatalog(context);
-    }
-
     public Map<String, String> getPaimonOptionsMap() {
-        Map<String, String> properties = catalogProperty.getHadoopProperties();
-        Map<String, String> options = Maps.newHashMap();
-        options.put(PaimonProperties.WAREHOUSE, 
properties.get(PaimonProperties.WAREHOUSE));
-        setPaimonCatalogOptions(properties, options);
-        setPaimonExtraOptions(properties, options);
-        return options;
-    }
-
-    protected abstract void setPaimonCatalogOptions(Map<String, String> 
properties, Map<String, String> options);
-
-    protected void setPaimonExtraOptions(Map<String, String> properties, 
Map<String, String> options) {
-        for (Map.Entry<String, String> kv : properties.entrySet()) {
-            if (kv.getKey().startsWith(PaimonProperties.PAIMON_PREFIX)) {
-                
options.put(kv.getKey().substring(PaimonProperties.PAIMON_PREFIX.length()), 
kv.getValue());
-            }
-        }
-
-        // hive version.
-        // This property is used for both FE and BE, so it has no "paimon." 
prefix.
-        // We need to handle it separately.
-        if (properties.containsKey(HMSProperties.HIVE_VERSION)) {
-            options.put(HMSProperties.HIVE_VERSION, 
properties.get(HMSProperties.HIVE_VERSION));
-        }
+        makeSureInitialized();
+        return paimonProperties.getCatalogOptionsMap();
     }
 
     @Override
     public void checkProperties() throws DdlException {
-        super.checkProperties();
-        for (String requiredProperty : REQUIRED_PROPERTIES) {
-            if 
(!catalogProperty.getProperties().containsKey(requiredProperty)) {
-                throw new DdlException("Required property '" + 
requiredProperty + "' is missing");
+        if (null != paimonProperties) {
+            try {
+                this.paimonProperties = (AbstractPaimonProperties) 
MetastoreProperties
+                        .create(catalogProperty.getProperties());
+            } catch (UserException e) {
+                throw new DdlException("Failed to create Paimon properties 
from catalog properties, exception: "
+                        + ExceptionUtils.getRootCauseMessage(e), e);
             }
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalogFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalogFactory.java
index 53e790d8c9e..b904ac169ff 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalogFactory.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalogFactory.java
@@ -19,7 +19,6 @@ package org.apache.doris.datasource.paimon;
 
 import org.apache.doris.common.DdlException;
 import org.apache.doris.datasource.ExternalCatalog;
-import org.apache.doris.datasource.property.constants.HMSProperties;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -36,12 +35,9 @@ public class PaimonExternalCatalogFactory {
         metastoreType = metastoreType.toLowerCase();
         switch (metastoreType) {
             case PaimonExternalCatalog.PAIMON_HMS:
-                return new PaimonHMSExternalCatalog(catalogId, name, resource, 
props, comment);
             case PaimonExternalCatalog.PAIMON_FILESYSTEM:
-                return new PaimonFileExternalCatalog(catalogId, name, 
resource, props, comment);
             case PaimonExternalCatalog.PAIMON_DLF:
-                props.put(HMSProperties.HIVE_METASTORE_TYPE, 
HMSProperties.DLF_TYPE);
-                return new PaimonDLFExternalCatalog(catalogId, name, resource, 
props, comment);
+                return new PaimonExternalCatalog(catalogId, name, resource, 
props, comment);
             default:
                 throw new DdlException("Unknown " + 
PaimonExternalCatalog.PAIMON_CATALOG_TYPE
                         + " value: " + metastoreType);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonFileExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonFileExternalCatalog.java
index e74f3deeaf5..c2c3e139567 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonFileExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonFileExternalCatalog.java
@@ -17,77 +17,12 @@
 
 package org.apache.doris.datasource.paimon;
 
-import org.apache.doris.common.util.LocationPath;
-import org.apache.doris.datasource.property.PropertyConverter;
-import org.apache.doris.datasource.property.constants.CosProperties;
-import org.apache.doris.datasource.property.constants.ObsProperties;
-import org.apache.doris.datasource.property.constants.OssProperties;
-import org.apache.doris.datasource.property.constants.PaimonProperties;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import java.util.Map;
 
-
 public class PaimonFileExternalCatalog extends PaimonExternalCatalog {
-    private static final Logger LOG = 
LogManager.getLogger(PaimonFileExternalCatalog.class);
 
     public PaimonFileExternalCatalog(long catalogId, String name, String 
resource,
             Map<String, String> props, String comment) {
         super(catalogId, name, resource, props, comment);
     }
-
-    @Override
-    protected void initLocalObjectsImpl() {
-        super.initLocalObjectsImpl();
-        catalogType = PAIMON_FILESYSTEM;
-        catalog = createCatalog();
-    }
-
-    @Override
-    protected void setPaimonCatalogOptions(Map<String, String> properties, 
Map<String, String> options) {
-        if (properties.containsKey(PaimonProperties.PAIMON_S3_ENDPOINT)) {
-            options.put(PaimonProperties.PAIMON_S3_ENDPOINT,
-                    properties.get(PaimonProperties.PAIMON_S3_ENDPOINT));
-            options.put(PaimonProperties.PAIMON_S3_ACCESS_KEY,
-                    properties.get(PaimonProperties.PAIMON_S3_ACCESS_KEY));
-            options.put(PaimonProperties.PAIMON_S3_SECRET_KEY,
-                    properties.get(PaimonProperties.PAIMON_S3_SECRET_KEY));
-        } else if 
(properties.containsKey(PaimonProperties.PAIMON_OSS_ENDPOINT)) {
-            boolean hdfsEnabled = Boolean.parseBoolean(properties.getOrDefault(
-                    OssProperties.OSS_HDFS_ENABLED, "false"));
-            if 
(!LocationPath.isHdfsOnOssEndpoint(properties.get(PaimonProperties.PAIMON_OSS_ENDPOINT))
-                    && !hdfsEnabled) {
-                options.put(PaimonProperties.PAIMON_OSS_ENDPOINT,
-                        properties.get(PaimonProperties.PAIMON_OSS_ENDPOINT));
-                options.put(PaimonProperties.PAIMON_OSS_ACCESS_KEY,
-                        
properties.get(PaimonProperties.PAIMON_OSS_ACCESS_KEY));
-                options.put(PaimonProperties.PAIMON_OSS_SECRET_KEY,
-                        
properties.get(PaimonProperties.PAIMON_OSS_SECRET_KEY));
-            }
-        } else if (properties.containsKey(CosProperties.ENDPOINT)) {
-            options.put(PaimonProperties.PAIMON_S3_ENDPOINT,
-                    properties.get(CosProperties.ENDPOINT));
-            options.put(PaimonProperties.PAIMON_S3_ACCESS_KEY,
-                    properties.get(CosProperties.ACCESS_KEY));
-            options.put(PaimonProperties.PAIMON_S3_SECRET_KEY,
-                    properties.get(CosProperties.SECRET_KEY));
-            options.put(PaimonProperties.WAREHOUSE,
-                    options.get(PaimonProperties.WAREHOUSE).replace("cosn://", 
"s3://"));
-        } else if (properties.containsKey(ObsProperties.ENDPOINT)) {
-            options.put(PaimonProperties.PAIMON_S3_ENDPOINT,
-                    properties.get(ObsProperties.ENDPOINT));
-            options.put(PaimonProperties.PAIMON_S3_ACCESS_KEY,
-                    properties.get(ObsProperties.ACCESS_KEY));
-            options.put(PaimonProperties.PAIMON_S3_SECRET_KEY,
-                    properties.get(ObsProperties.SECRET_KEY));
-            options.put(PaimonProperties.WAREHOUSE,
-                    options.get(PaimonProperties.WAREHOUSE).replace("obs://", 
"s3://"));
-        }
-
-        if (properties.containsKey(PropertyConverter.USE_PATH_STYLE)) {
-            options.put(PaimonProperties.S3_PATH_STYLE, 
properties.get(PropertyConverter.USE_PATH_STYLE));
-        }
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java
index fb27bb56696..ae5cf7073dd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java
@@ -17,48 +17,12 @@
 
 package org.apache.doris.datasource.paimon;
 
-import org.apache.doris.common.DdlException;
-import org.apache.doris.datasource.property.constants.HMSProperties;
-import org.apache.doris.datasource.property.constants.PaimonProperties;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.List;
 import java.util.Map;
 
 public class PaimonHMSExternalCatalog extends PaimonExternalCatalog {
-    private static final Logger LOG = 
LogManager.getLogger(PaimonHMSExternalCatalog.class);
-    private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
-            HMSProperties.HIVE_METASTORE_URIS
-    );
 
     public PaimonHMSExternalCatalog(long catalogId, String name, String 
resource,
             Map<String, String> props, String comment) {
         super(catalogId, name, resource, props, comment);
     }
-
-    @Override
-    protected void initLocalObjectsImpl() {
-        super.initLocalObjectsImpl();
-        catalogType = PAIMON_HMS;
-        catalog = createCatalog();
-    }
-
-    @Override
-    protected void setPaimonCatalogOptions(Map<String, String> properties, 
Map<String, String> options) {
-        options.put(PaimonProperties.PAIMON_CATALOG_TYPE, 
getPaimonCatalogType(catalogType));
-        options.put(PaimonProperties.HIVE_METASTORE_URIS, 
properties.get(HMSProperties.HIVE_METASTORE_URIS));
-    }
-
-    @Override
-    public void checkProperties() throws DdlException {
-        super.checkProperties();
-        for (String requiredProperty : REQUIRED_PROPERTIES) {
-            if 
(!catalogProperty.getProperties().containsKey(requiredProperty)) {
-                throw new DdlException("Required property '" + 
requiredProperty + "' is missing");
-            }
-        }
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 0d0bf8d3b32..9eecaa20d35 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -139,9 +139,9 @@ public class PaimonScanNode extends FileQueryScanNode {
     protected ConcurrentHashMap<Long, Boolean> currentQuerySchema = new 
ConcurrentHashMap<>();
 
     public PaimonScanNode(PlanNodeId id,
-            TupleDescriptor desc,
-            boolean needCheckColumnPriv,
-            SessionVariable sv) {
+                          TupleDescriptor desc,
+                          boolean needCheckColumnPriv,
+                          SessionVariable sv) {
         super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE, 
needCheckColumnPriv, sv);
     }
 
@@ -421,14 +421,8 @@ public class PaimonScanNode extends FileQueryScanNode {
     }
 
     @Override
-    public Map<String, String> getLocationProperties() throws 
MetaNotFoundException, DdlException {
-        HashMap<String, String> map = new 
HashMap<>(source.getCatalog().getProperties());
-        
source.getCatalog().getCatalogProperty().getHadoopProperties().forEach((k, v) 
-> {
-            if (!map.containsKey(k)) {
-                map.put(k, v);
-            }
-        });
-        return map;
+    protected Map<String, String> getLocationProperties() {
+        return 
source.getCatalog().getCatalogProperty().getBackendStorageProperties();
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractPaimonProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractPaimonProperties.java
new file mode 100644
index 00000000000..b5e12a4970c
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractPaimonProperties.java
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import org.apache.doris.common.security.authentication.ExecutionAuthenticator;
+import org.apache.doris.datasource.property.ConnectorProperty;
+import org.apache.doris.datasource.property.storage.S3Properties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+
+import lombok.Getter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractPaimonProperties extends MetastoreProperties {
+    @ConnectorProperty(
+            names = {"warehouse"},
+            description = "The location of the Paimon warehouse. This is where 
the tables will be stored."
+    )
+    protected String warehouse;
+
+    @Getter
+    protected ExecutionAuthenticator executionAuthenticator = new 
ExecutionAuthenticator() {
+    };
+
+    @Getter
+    protected Options catalogOptions;
+
+    private final AtomicReference<Map<String, String>> catalogOptionsMapRef = 
new AtomicReference<>();
+
+    public abstract String getPaimonCatalogType();
+
+    private static final String USER_PROPERTY_PREFIX = "paimon.";
+
+    protected AbstractPaimonProperties(Map<String, String> props) {
+        super(props);
+    }
+
+    public abstract Catalog initializeCatalog(String catalogName, 
List<StorageProperties> storagePropertiesList);
+
+    /**
+     * Adapt S3 storage properties for Apache Paimon's S3 file system.
+     *
+     * <p>Paimon's S3 file system does not follow the standard 
Hadoop-compatible
+     * configuration keys (like fs.s3a.access.key). Instead, it expects 
specific
+     * keys such as "s3.access.key", "s3.secret.key", etc.
+     *
+     * <p>Therefore, we explicitly map our internal S3 configuration (usually 
designed
+     * for HDFS-compatible systems) to Paimon's expected format.
+     *
+     * <p>See: org.apache.paimon.s3.S3Loader
+     *
+     * @param storagePropertiesList the list of configured storage backends
+     */
+    protected void appendS3PropertiesIsNeeded(List<StorageProperties> 
storagePropertiesList) {
+
+        S3Properties s3Properties = (S3Properties) 
storagePropertiesList.stream()
+                .filter(storageProperties -> storageProperties.getType() == 
StorageProperties.Type.S3)
+                .findFirst()
+                .orElse(null);
+        if (s3Properties != null) {
+            catalogOptions.set("s3.access.key", s3Properties.getSecretKey());
+            catalogOptions.set("s3.secret.key", s3Properties.getAccessKey());
+            catalogOptions.set("s3.endpoint", s3Properties.getEndpoint());
+            catalogOptions.set("s3.region", s3Properties.getRegion());
+        }
+
+    }
+
+    protected void appendCatalogOptions(List<StorageProperties> 
storagePropertiesList) {
+        if (StringUtils.isNotBlank(warehouse)) {
+            catalogOptions.set(CatalogOptions.WAREHOUSE.key(), warehouse);
+        }
+        catalogOptions.set(CatalogOptions.METASTORE.key(), getMetastoreType());
+        origProps.forEach((k, v) -> {
+            if (k.toLowerCase().startsWith(USER_PROPERTY_PREFIX)) {
+                String newKey = k.substring(USER_PROPERTY_PREFIX.length());
+                if (StringUtils.isNotBlank(newKey)) {
+                    catalogOptions.set(newKey, v);
+                }
+            }
+        });
+        appendS3PropertiesIsNeeded(storagePropertiesList);
+    }
+
+    /**
+     * Build catalog options including common and subclass-specific ones.
+     */
+    public void buildCatalogOptions(List<StorageProperties> 
storagePropertiesList) {
+        catalogOptions = new Options();
+        appendCatalogOptions(storagePropertiesList);
+        appendCustomCatalogOptions();
+    }
+
+    public Map<String, String> getCatalogOptionsMap() {
+        // Return the cached map if already initialized
+        Map<String, String> existing = catalogOptionsMapRef.get();
+        if (existing != null) {
+            return existing;
+        }
+
+        // Check that the catalog options source is available
+        if (catalogOptions == null) {
+            throw new IllegalStateException("Catalog options have not been 
initialized. Call"
+                    + " buildCatalogOptions first.");
+        }
+
+        // Construct the map manually using the provided keys
+        Map<String, String> computed = new HashMap<>();
+        for (String key : catalogOptions.keySet()) {
+            computed.put(key, catalogOptions.get(key));
+        }
+
+        // Attempt to set the constructed map atomically; only one thread wins
+        if (catalogOptionsMapRef.compareAndSet(null, computed)) {
+            return computed;
+        } else {
+            // Another thread already initialized it; return the existing one
+            return catalogOptionsMapRef.get();
+        }
+    }
+
+
+    /**
+     * Hook method for subclasses to append metastore-specific or custom 
catalog options.
+     *
+     * <p>This method is invoked after common catalog options (e.g., warehouse 
path,
+     * metastore type, user-defined keys, and S3 compatibility mappings) have 
been
+     * added to the {@link org.apache.paimon.options.Options} instance.
+     *
+     * <p>Subclasses should override this method to inject additional 
configuration
+     * required for their specific metastore or environment. For example:
+     *
+     * <ul>
+     *   <li>DLF-based catalog may require a custom metastore client 
class.</li>
+     *   <li>HMS-based catalog may include URI and client pool parameters.</li>
+     *   <li>Other environments may inject authentication, endpoint, or 
caching options.</li>
+     * </ul>
+     *
+     * <p>If the subclass does not require any special options beyond the 
common ones,
+     * it can safely leave this method empty.
+     */
+    protected abstract void appendCustomCatalogOptions();
+
+    /**
+     * Returns the metastore type identifier used by the Paimon catalog 
factory.
+     *
+     * <p>This identifier must match one of the known metastore types 
supported by
+     * Apache Paimon. Internally, the value returned here is used to configure 
the
+     * `metastore` option in {@code Options}, which determines the specific
+     * {@link org.apache.paimon.catalog.CatalogFactory} implementation to be 
used
+     * when instantiating the catalog.
+     *
+     * <p>You can find valid identifiers by reviewing implementations of the
+     * {@link org.apache.paimon.catalog.CatalogFactory} interface. Each 
implementation
+     * declares its identifier via a static {@code IDENTIFIER} field or 
equivalent constant.
+     *
+     * <p>Examples:
+     * <ul>
+     *   <li>{@code "filesystem"} - for {@link 
org.apache.paimon.catalog.FileSystemCatalogFactory}</li>
+     *   <li>{@code "hive"} - for {@link 
org.apache.paimon.hive.HiveCatalogFactory}</li>
+     * </ul>
+     *
+     * @return the metastore type identifier string
+     */
+    protected abstract String getMetastoreType();
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/MetastoreProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/MetastoreProperties.java
index 3e5e2273b08..48cba3c0bcc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/MetastoreProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/MetastoreProperties.java
@@ -46,6 +46,7 @@ public class MetastoreProperties extends ConnectionProperties 
{
     public enum Type {
         HMS("hms"),
         ICEBERG("iceberg"),
+        PAIMON("paimon"),
         GLUE("glue"),
         DLF("dlf"),
         DATAPROC("dataproc"),
@@ -83,6 +84,7 @@ public class MetastoreProperties extends ConnectionProperties 
{
         //subclasses should be registered here
         register(Type.HMS, new HMSPropertiesFactory());
         register(Type.ICEBERG, new IcebergPropertiesFactory());
+        register(Type.PAIMON, new PaimonPropertiesFactory());
     }
 
     public static void register(Type type, MetastorePropertiesFactory factory) 
{
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStoreProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStoreProperties.java
new file mode 100644
index 00000000000..2f65dbad9d8
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStoreProperties.java
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+import org.apache.doris.datasource.property.storage.OSSProperties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+
+import com.aliyun.datalake.metastore.common.DataLakeConfig;
+import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.hive.HiveCatalogOptions;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * PaimonAliyunDLFMetaStoreProperties
+ *
+ * <p>This class provides configuration support for using Apache Paimon with
+ * Aliyun Data Lake Formation (DLF) as the metastore. Although DLF is not an
+ * officially supported metastore type in Paimon, this implementation adapts
+ * DLF by treating it as a Hive Metastore (HMS) underneath, enabling
+ * interoperability with Paimon's HiveCatalog.
+ *
+ * <p>Key Characteristics:
+ * <ul>
+ *   <li>Internally uses HiveCatalog with custom HiveConf configured for 
Aliyun DLF.</li>
+ *   <li>Relies on {@link ProxyMetaStoreClient} to bridge DLF 
compatibility.</li>
+ *   <li>Requires Aliyun OSS as the storage backend. Other storage types are 
not
+ *       currently verified for compatibility.</li>
+ * </ul>
+ *
+ * <p>Note: This is an internal extension and not an officially supported 
Paimon
+ * metastore type. Future compatibility should be validated when upgrading 
Paimon
+ * or changing storage backends.
+ *
+ * @see org.apache.paimon.hive.HiveCatalog
+ * @see org.apache.paimon.catalog.CatalogFactory
+ * @see ProxyMetaStoreClient
+ */
+public class PaimonAliyunDLFMetaStoreProperties extends 
AbstractPaimonProperties {
+
+    private AliyunDLFBaseProperties baseProperties;
+
+    protected PaimonAliyunDLFMetaStoreProperties(Map<String, String> props) {
+        super(props);
+    }
+
+    @Override
+    public void initNormalizeAndCheckProps() {
+        super.initNormalizeAndCheckProps();
+        baseProperties = AliyunDLFBaseProperties.of(origProps);
+    }
+
+    private HiveConf buildHiveConf() {
+        HiveConf hiveConf = new HiveConf();
+        hiveConf.set(DataLakeConfig.CATALOG_ACCESS_KEY_ID, 
baseProperties.dlfAccessKey);
+        hiveConf.set(DataLakeConfig.CATALOG_ACCESS_KEY_SECRET, 
baseProperties.dlfSecretKey);
+        hiveConf.set(DataLakeConfig.CATALOG_ENDPOINT, 
baseProperties.dlfEndpoint);
+        hiveConf.set(DataLakeConfig.CATALOG_REGION_ID, 
baseProperties.dlfRegion);
+        hiveConf.set(DataLakeConfig.CATALOG_SECURITY_TOKEN, 
baseProperties.dlfSessionToken);
+        hiveConf.set(DataLakeConfig.CATALOG_USER_ID, baseProperties.dlfUid);
+        hiveConf.set(DataLakeConfig.CATALOG_ID, baseProperties.dlfCatalogId);
+        hiveConf.set(DataLakeConfig.CATALOG_PROXY_MODE, 
baseProperties.dlfProxyMode);
+        return hiveConf;
+    }
+
+    @Override
+    public Catalog initializeCatalog(String catalogName, 
List<StorageProperties> storagePropertiesList) {
+        HiveConf hiveConf = buildHiveConf();
+        buildCatalogOptions(storagePropertiesList);
+        StorageProperties ossProps = storagePropertiesList.stream()
+                .filter(sp -> sp.getType() == StorageProperties.Type.OSS)
+                .findFirst()
+                .orElseThrow(() -> new IllegalStateException("Paimon DLF 
metastore requires OSS storage properties."));
+
+        if (!(ossProps instanceof OSSProperties)) {
+            throw new IllegalStateException("Expected OSSProperties type.");
+        }
+        OSSProperties ossProperties = (OSSProperties) ossProps;
+        for (Map.Entry<String, String> entry : 
ossProperties.getHadoopStorageConfig()) {
+            catalogOptions.set(entry.getKey(), entry.getValue());
+        }
+        hiveConf.addResource(ossProperties.getHadoopStorageConfig());
+        CatalogContext catalogContext = CatalogContext.create(catalogOptions, 
hiveConf);
+        return CatalogFactory.createCatalog(catalogContext);
+    }
+
+
+    @Override
+    protected void appendCustomCatalogOptions() {
+        catalogOptions.set("metastore.client.class", 
ProxyMetaStoreClient.class.getName());
+    }
+
+    @Override
+    protected String getMetastoreType() {
+        return HiveCatalogOptions.IDENTIFIER;
+    }
+
+    @Override
+    public String getPaimonCatalogType() {
+        return PaimonExternalCatalog.PAIMON_DLF;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonFileSystemMetaStoreProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonFileSystemMetaStoreProperties.java
new file mode 100644
index 00000000000..adfa53eab94
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonFileSystemMetaStoreProperties.java
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import 
org.apache.doris.common.security.authentication.HadoopExecutionAuthenticator;
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+import org.apache.doris.datasource.property.storage.HdfsProperties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.FileSystemCatalogFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class PaimonFileSystemMetaStoreProperties extends 
AbstractPaimonProperties {
+    protected PaimonFileSystemMetaStoreProperties(Map<String, String> props) {
+        super(props);
+    }
+
+    @Override
+    public Catalog initializeCatalog(String catalogName, 
List<StorageProperties> storagePropertiesList) {
+        buildCatalogOptions(storagePropertiesList);
+        Configuration conf = new Configuration(false);
+        storagePropertiesList.forEach(storageProperties -> {
+            for (Map.Entry<String, String> entry : 
storageProperties.getHadoopStorageConfig()) {
+                catalogOptions.set(entry.getKey(), entry.getValue());
+            }
+            conf.addResource(storageProperties.getHadoopStorageConfig());
+            if 
(storageProperties.getType().equals(StorageProperties.Type.HDFS)) {
+                this.executionAuthenticator = new 
HadoopExecutionAuthenticator(((HdfsProperties) storageProperties)
+                        .getHadoopAuthenticator());
+            }
+        });
+
+        CatalogContext catalogContext = CatalogContext.create(catalogOptions, 
conf);
+        try {
+            return this.executionAuthenticator.execute(() -> 
CatalogFactory.createCatalog(catalogContext));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    protected void appendCustomCatalogOptions() {
+        //nothing need to do
+    }
+
+    @Override
+    protected String getMetastoreType() {
+        return FileSystemCatalogFactory.IDENTIFIER;
+    }
+
+    @Override
+    public String getPaimonCatalogType() {
+        return PaimonExternalCatalog.PAIMON_FILESYSTEM;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonHMSMetaStoreProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonHMSMetaStoreProperties.java
new file mode 100644
index 00000000000..7c2d2ee79a3
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonHMSMetaStoreProperties.java
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import 
org.apache.doris.common.security.authentication.HadoopExecutionAuthenticator;
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+import org.apache.doris.datasource.property.ConnectorProperty;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.hive.HiveCatalogOptions;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class PaimonHMSMetaStoreProperties extends AbstractPaimonProperties {
+
+    private HMSBaseProperties hmsBaseProperties;
+
+    private static final String CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_KEY = 
"client-pool-cache.eviction-interval-ms";
+
+    private static final String LOCATION_IN_PROPERTIES_KEY = 
"location-in-properties";
+
+    @ConnectorProperty(
+            names = {CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_KEY},
+            required = false,
+            description = "Setting the client's pool cache eviction 
interval(ms).")
+    private long clientPoolCacheEvictionIntervalMs = 
TimeUnit.MINUTES.toMillis(5L);
+
+    @ConnectorProperty(
+            names = {LOCATION_IN_PROPERTIES_KEY},
+            required = false,
+            description = "Setting whether to use the location in the 
properties.")
+    private boolean locationInProperties = false;
+
+
+    @Override
+    public String getPaimonCatalogType() {
+        return PaimonExternalCatalog.PAIMON_DLF;
+    }
+
+    protected PaimonHMSMetaStoreProperties(Map<String, String> props) {
+        super(props);
+    }
+
+    @Override
+    public void initNormalizeAndCheckProps() {
+        super.initNormalizeAndCheckProps();
+        hmsBaseProperties = HMSBaseProperties.of(origProps);
+        hmsBaseProperties.initAndCheckParams();
+        this.executionAuthenticator = new 
HadoopExecutionAuthenticator(hmsBaseProperties.getHmsAuthenticator());
+    }
+
+
+    /**
+     * Builds the Hadoop Configuration by adding hive-site.xml and 
storage-specific configs.
+     */
+    private Configuration buildHiveConfiguration(List<StorageProperties> 
storagePropertiesList) {
+        Configuration conf = hmsBaseProperties.getHiveConf();
+
+        for (StorageProperties sp : storagePropertiesList) {
+            if (sp.getHadoopStorageConfig() != null) {
+                conf.addResource(sp.getHadoopStorageConfig());
+            }
+        }
+        return conf;
+    }
+
+    @Override
+    public Catalog initializeCatalog(String catalogName, 
List<StorageProperties> storagePropertiesList) {
+        Configuration conf = buildHiveConfiguration(storagePropertiesList);
+        buildCatalogOptions(storagePropertiesList);
+        for (Map.Entry<String, String> entry : conf) {
+            catalogOptions.set(entry.getKey(), entry.getValue());
+        }
+        CatalogContext catalogContext = CatalogContext.create(catalogOptions, 
conf);
+        try {
+            return executionAuthenticator.execute(() -> 
CatalogFactory.createCatalog(catalogContext));
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to create Paimon catalog with 
HMS metastore", e);
+        }
+
+    }
+
+    @Override
+    protected String getMetastoreType() {
+        //See org.apache.paimon.hive.HiveCatalogFactory
+        return HiveCatalogOptions.IDENTIFIER;
+    }
+
+    @Override
+    protected void appendCustomCatalogOptions() {
+        catalogOptions.set(CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_KEY,
+                String.valueOf(clientPoolCacheEvictionIntervalMs));
+        catalogOptions.set(LOCATION_IN_PROPERTIES_KEY, 
String.valueOf(locationInProperties));
+        catalogOptions.set("uri", hmsBaseProperties.getHiveMetastoreUri());
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonPropertiesFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonPropertiesFactory.java
new file mode 100644
index 00000000000..12dac3bb739
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonPropertiesFactory.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import java.util.Map;
+
+public class PaimonPropertiesFactory extends 
AbstractMetastorePropertiesFactory {
+
+    private static final String KEY = "paimon.catalog.type";
+    private static final String DEFAULT_TYPE = "filesystem";
+
+    public PaimonPropertiesFactory() {
+        register("dlf", PaimonAliyunDLFMetaStoreProperties::new);
+        register("filesystem", PaimonFileSystemMetaStoreProperties::new);
+        register("hms", PaimonHMSMetaStoreProperties::new);
+    }
+
+    @Override
+    public MetastoreProperties create(Map<String, String> props) {
+        return createInternal(props, KEY, DEFAULT_TYPE);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
index 8cec9d6a63d..b36e4309701 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
@@ -24,6 +24,7 @@ import com.google.common.base.Strings;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
@@ -257,8 +258,23 @@ public abstract class AbstractS3CompatibleProperties 
extends StorageProperties i
 
     @Override
     public void initializeHadoopStorageConfig() {
-        throw new UnsupportedOperationException("Hadoop storage config 
initialization is not"
-                + " supported for S3 compatible storage.");
+        hadoopStorageConfig = new Configuration();
+        // Compatibility note: Due to historical reasons, even when the 
underlying
+        // storage is OSS, OBS, etc., users may still configure the schema as 
"s3://".
+        // To ensure backward compatibility, we append S3-related properties 
by default.
+        appendS3HdfsProperties(hadoopStorageConfig);
+    }
+
+    private void appendS3HdfsProperties(Configuration hadoopStorageConfig) {
+        hadoopStorageConfig.set("fs.s3.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem");
+        hadoopStorageConfig.set("fs.s3a.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem");
+        hadoopStorageConfig.set("fs.s3a.endpoint", getEndpoint());
+        hadoopStorageConfig.set("fs.s3a.access.key", getAccessKey());
+        hadoopStorageConfig.set("fs.s3a.secret.key", getSecretKey());
+        hadoopStorageConfig.set("fs.s3a.connection.maximum", 
getMaxConnections());
+        hadoopStorageConfig.set("fs.s3a.connection.request.timeout", 
getRequestTimeoutS());
+        hadoopStorageConfig.set("fs.s3a.connection.timeout", 
getConnectionTimeoutS());
+        hadoopStorageConfig.set("fs.s3a.path.style.access", usePathStyle);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
index a2ae545f048..6fea7cf18ec 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
@@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableSet;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 
@@ -130,17 +129,8 @@ public class COSProperties extends 
AbstractS3CompatibleProperties {
 
     @Override
     public void initializeHadoopStorageConfig() {
-        hadoopStorageConfig = new Configuration();
-        hadoopStorageConfig.set("fs.cos.impl", 
"org.apache.hadoop.fs.CosFileSystem");
-        hadoopStorageConfig.set("fs.cosn.impl", 
"org.apache.hadoop.fs.CosFileSystem");
-        hadoopStorageConfig.set("fs.AbstractFileSystem.cos.impl", 
"org.apache.hadoop.fs.Cos");
-        hadoopStorageConfig.set("fs.cos.secretId", accessKey);
-        hadoopStorageConfig.set("fs.cos.secretKey", secretKey);
-        hadoopStorageConfig.set("fs.cos.region", region);
-        hadoopStorageConfig.set("fs.cos.endpoint", endpoint);
-        hadoopStorageConfig.set("fs.cos.connection.timeout", 
connectionTimeoutS);
-        hadoopStorageConfig.set("fs.cos.connection.request.timeout", 
requestTimeoutS);
-        hadoopStorageConfig.set("fs.cos.connection.maximum", maxConnections);
-        hadoopStorageConfig.set("fs.cos.use.path.style", usePathStyle);
+        super.initializeHadoopStorageConfig();
+        hadoopStorageConfig.set("fs.cos.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem");
+        hadoopStorageConfig.set("fs.cosn.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem");
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/MinioProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/MinioProperties.java
index 0f5349e1452..75f9fc85881 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/MinioProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/MinioProperties.java
@@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableSet;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 
 import java.util.Map;
 import java.util.Set;
@@ -95,18 +94,4 @@ public class MinioProperties extends 
AbstractS3CompatibleProperties {
     protected Set<Pattern> endpointPatterns() {
         return 
ImmutableSet.of(Pattern.compile("^(?:https?://)?[a-zA-Z0-9.-]+(?::\\d+)?$"));
     }
-
-    @Override
-    public void initializeHadoopStorageConfig() {
-        hadoopStorageConfig = new Configuration();
-        hadoopStorageConfig.set("fs.s3.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem");
-        hadoopStorageConfig.set("fs.s3a.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem");
-        hadoopStorageConfig.set("fs.s3a.endpoint", endpoint);
-        hadoopStorageConfig.set("fs.s3a.access.key", accessKey);
-        hadoopStorageConfig.set("fs.s3a.secret.key", secretKey);
-        hadoopStorageConfig.set("fs.s3a.connection.maximum", maxConnections);
-        hadoopStorageConfig.set("fs.s3a.connection.request.timeout", 
requestTimeoutS);
-        hadoopStorageConfig.set("fs.s3a.connection.timeout", 
connectionTimeoutS);
-        hadoopStorageConfig.set("fs.s3a.path.style.access", usePathStyle);
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
index 051dd9a927c..513727416ef 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
@@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableSet;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 
@@ -135,13 +134,7 @@ public class OBSProperties extends 
AbstractS3CompatibleProperties {
 
     @Override
     public void initializeHadoopStorageConfig() {
-        hadoopStorageConfig = new Configuration();
-        hadoopStorageConfig.set("fs.obs.impl", 
"com.obs.services.hadoop.fs.OBSFileSystem");
-        hadoopStorageConfig.set("fs.obs.access.key", accessKey);
-        hadoopStorageConfig.set("fs.obs.secret.key", secretKey);
-        hadoopStorageConfig.set("fs.obs.endpoint", endpoint);
-        hadoopStorageConfig.set("fs.obs.connection.timeout", 
connectionTimeoutS);
-        hadoopStorageConfig.set("fs.obs.request.timeout", requestTimeoutS);
-        hadoopStorageConfig.set("fs.obs.connection.max", maxConnections);
+        super.initializeHadoopStorageConfig();
+        hadoopStorageConfig.set("fs.obs.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem");
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
index 1e34311f9e9..7b27a1ea160 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
@@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableSet;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 
@@ -172,15 +171,7 @@ public class OSSProperties extends 
AbstractS3CompatibleProperties {
 
     @Override
     public void initializeHadoopStorageConfig() {
-        hadoopStorageConfig = new Configuration();
-        hadoopStorageConfig.set("fs.oss.impl", 
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
-        hadoopStorageConfig.set("fs.oss.endpoint", endpoint);
-        hadoopStorageConfig.set("fs.oss.region", region);
-        hadoopStorageConfig.set("fs.oss.accessKeyId", accessKey);
-        hadoopStorageConfig.set("fs.oss.accessKeySecret", secretKey);
-        hadoopStorageConfig.set("fs.oss.connection.timeout", 
connectionTimeoutS);
-        hadoopStorageConfig.set("fs.oss.connection.max", maxConnections);
-        hadoopStorageConfig.set("fs.oss.connection.request.timeout", 
requestTimeoutS);
-        hadoopStorageConfig.set("fs.oss.use.path.style.access", usePathStyle);
+        super.initializeHadoopStorageConfig();
+        hadoopStorageConfig.set("fs.oss.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem");
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
index e664edc6188..19a9195ec2f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
@@ -23,11 +23,9 @@ import 
org.apache.doris.datasource.property.storage.exception.StoragePropertiesE
 
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
@@ -39,8 +37,6 @@ import 
software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsPr
 import software.amazon.awssdk.services.sts.StsClient;
 import 
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
 
-import java.lang.reflect.Field;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
@@ -219,34 +215,6 @@ public class S3Properties extends 
AbstractS3CompatibleProperties {
         return ENDPOINT_PATTERN;
     }
 
-    private static List<Field> getIdentifyFields() {
-        List<Field> fields = Lists.newArrayList();
-        try {
-            //todo AliyunDlfProperties should in OSS storage type.
-            fields.add(S3Properties.class.getDeclaredField("s3AccessKey"));
-            // fixme Add it when MS done
-            
//fields.add(AliyunDLFProperties.class.getDeclaredField("dlfAccessKey"));
-            
//fields.add(AWSGlueProperties.class.getDeclaredField("glueAccessKey"));
-            return fields;
-        } catch (NoSuchFieldException e) {
-            // should not happen
-            throw new RuntimeException("Failed to get field: " + 
e.getMessage(), e);
-        }
-    }
-
-    /*
-    public void toPaimonOSSFileIOProperties(Options options) {
-        options.set("fs.oss.endpoint", s3Endpoint);
-        options.set("fs.oss.accessKeyId", s3AccessKey);
-        options.set("fs.oss.accessKeySecret", s3SecretKey);
-    }
-
-    public void toPaimonS3FileIOProperties(Options options) {
-        options.set("s3.endpoint", s3Endpoint);
-        options.set("s3.access-key", s3AccessKey);
-        options.set("s3.secret-key", s3SecretKey);
-    }*/
-
     @Override
     public Map<String, String> getBackendConfigProperties() {
         Map<String, String> backendProperties = 
generateBackendS3Configuration(s3ConnectionMaximum,
@@ -297,18 +265,17 @@ public class S3Properties extends 
AbstractS3CompatibleProperties {
                 InstanceProfileCredentialsProvider.create());
     }
 
-
     @Override
     public void initializeHadoopStorageConfig() {
-        hadoopStorageConfig = new Configuration();
-        hadoopStorageConfig.set("fs.s3.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem");
-        hadoopStorageConfig.set("fs.s3a.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem");
-        hadoopStorageConfig.set("fs.s3a.endpoint", endpoint);
-        hadoopStorageConfig.set("fs.s3a.access.key", accessKey);
-        hadoopStorageConfig.set("fs.s3a.secret.key", secretKey);
-        hadoopStorageConfig.set("fs.s3a.connection.maximum", 
s3ConnectionMaximum);
-        hadoopStorageConfig.set("fs.s3a.connection.request.timeout", 
s3ConnectionRequestTimeoutS);
-        hadoopStorageConfig.set("fs.s3a.connection.timeout", 
s3ConnectionTimeoutS);
-        hadoopStorageConfig.set("fs.s3a.path.style.access", usePathStyle);
+        super.initializeHadoopStorageConfig();
+        //Set assumed_roles
+        //@See 
https://hadoop.apache.org/docs/r3.4.1/hadoop-aws/tools/hadoop-aws/assumed_roles.html
+        if (StringUtils.isNotBlank(s3ExternalId) && 
StringUtils.isNotBlank(s3IAMRole)) {
+            //@See org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider
+            hadoopStorageConfig.set("fs.s3a.assumed.role.external.id", 
s3ExternalId);
+            hadoopStorageConfig.set("fs.s3a.assumed.role.arn", s3IAMRole);
+            hadoopStorageConfig.set("fs.s3a.aws.credentials.provider",
+                    
"org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider");
+        }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
index 8835d64a47c..71a871200bb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
@@ -187,7 +187,7 @@ public class LogicalHudiScan extends LogicalFileScan {
         Optional<IncrementalRelation> newIncrementalRelation = 
Optional.empty();
         if (optScanParams.isPresent() && 
optScanParams.get().incrementalRead()) {
             TableScanParams scanParams = optScanParams.get();
-            Map<String, String> optParams = table.getHadoopProperties();
+            Map<String, String> optParams = 
table.getBackendStorageProperties();
             if (scanParams.getMapParams().containsKey("beginTime")) {
                 optParams.put("hoodie.datasource.read.begin.instanttime", 
scanParams.getMapParams().get("beginTime"));
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
index 0eaeed81d4c..d9f75763510 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
@@ -162,7 +162,7 @@ public class HiveTableSink extends 
BaseExternalTableDataSink {
             
tSink.setBrokerAddresses(getBrokerAddresses(targetTable.getCatalog().bindBrokerName()));
         }
 
-        tSink.setHadoopConfig(targetTable.getHadoopProperties());
+        tSink.setHadoopConfig(targetTable.getBackendStorageProperties());
 
         tDataSink = new TDataSink(getDataSinkType());
         tDataSink.setHiveTableSink(tSink);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to