This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-refactor_property
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-refactor_property by 
this push:
     new faeb0987097 [Feat](Storage)Refactor Storage Integration: Unified 
Storage Parameter Handling for Export, TVF, and Backup (#49163)
faeb0987097 is described below

commit faeb09870975d1dba5583da36c8627fc267adcba
Author: Calvin Kirs <[email protected]>
AuthorDate: Tue Mar 18 22:23:59 2025 +0800

    [Feat](Storage)Refactor Storage Integration: Unified Storage Parameter 
Handling for Export, TVF, and Backup (#49163)
    
    ## PR Description
    This PR introduces a unified storage parameter handling mechanism for
    storage-related functionalities, including Export, Table-Valued
    Functions (TVF), and Backup. The key objective is to decouple business
    logic from the underlying storage implementation, ensuring that the
    business layer only focuses on its domain logic while storage parameters
    manage the interaction with the storage system.
    
    ## Key Changes
    ### Centralized Storage Parameter Management:
    Instead of handling storage configurations in multiple places, storage
    parameters now serve as the sole interface between the business logic
    and the storage layer. This improves maintainability, avoids scattered
    storage-related logic, and simplifies future storage extensions. ###
    Separation of Concerns:
    The business layer is storage-agnostic, meaning it does not need to
    handle storage-specific details. Storage parameters are responsible for
    translating high-level storage configurations into system-specific
    settings.
    
    ### Phase 1 Implementation:
    
    This PR focuses on integrating Export, TVF, and Backup with the new
    storage parameter mechanism. Subsequent PRs will address Broker and
    additional components, ensuring a smooth and incremental migration.
    
    ## Next Steps
    Gradually extend this unified approach to Broker and other
    storage-dependent functionalities. Continue refining the abstraction to
    support future storage systems with minimal modifications.
    
    ## Why This Change?
    Enhances maintainability by consolidating storage-related logic.
    Improves extensibility for future storage backends. Simplifies business
    logic, ensuring a cleaner separation of concerns.
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 .../authentication/HadoopAuthenticator.java        |  6 ++
 fe/fe-core/pom.xml                                 |  5 ++
 .../java/org/apache/doris/analysis/BrokerDesc.java | 16 +++--
 .../org/apache/doris/analysis/OutFileClause.java   | 45 +-----------
 .../org/apache/doris/analysis/StorageDesc.java     |  7 ++
 .../org/apache/doris/backup/BackupHandler.java     | 63 +++++++---------
 .../java/org/apache/doris/backup/BackupJob.java    |  5 +-
 .../java/org/apache/doris/backup/Repository.java   | 12 +---
 .../java/org/apache/doris/backup/RestoreJob.java   | 84 +++++++++++-----------
 .../java/org/apache/doris/catalog/S3Resource.java  |  8 ++-
 .../org/apache/doris/common/util/BrokerUtil.java   |  7 +-
 .../datasource/property/ConnectionProperties.java  | 10 ++-
 .../connection/PaimonConnectionProperties.java     |  5 +-
 .../storage/AbstractObjectStorageProperties.java   |  9 ++-
 .../datasource/property/storage/COSProperties.java | 19 +++--
 .../property/storage/HDFSProperties.java           | 68 +++++++++++++++++-
 .../datasource/property/storage/OBSProperties.java | 19 +++--
 .../datasource/property/storage/OSSProperties.java | 17 +++--
 .../property/storage/ObjectStorageProperties.java  |  4 +-
 .../datasource/property/storage/S3Properties.java  | 38 +++++-----
 .../property/storage/StorageProperties.java        | 34 ++++++++-
 .../property/storage/StorageTypeMapper.java        | 50 +++++++++++++
 .../org/apache/doris/fs/FileSystemFactory.java     | 24 ++++---
 .../org/apache/doris/fs/PersistentFileSystem.java  |  5 ++
 .../org/apache/doris/fs/remote/S3FileSystem.java   | 22 +++---
 .../apache/doris/fs/remote/dfs/DFSFileSystem.java  | 25 +++----
 .../apache/doris/fs/remote/dfs/JFSFileSystem.java  |  7 +-
 .../apache/doris/fs/remote/dfs/OFSFileSystem.java  |  7 +-
 .../ExternalFileTableValuedFunction.java           |  2 +
 .../doris/tablefunction/S3TableValuedFunction.java | 58 ++-------------
 .../apache/doris/datasource/CatalogAPITest.java    |  5 +-
 .../property/metastore/HMSPropertiesTest.java      |  2 +-
 .../property/storage/COSPropertiesTest.java        | 10 +--
 .../property/storage/OBSPropertyTest.java          | 17 ++---
 .../property/storage/OSSPropertiesTest.java        | 17 ++---
 .../property/storage/S3PropertiesTest.java         | 12 +---
 fe/pom.xml                                         | 11 +++
 37 files changed, 430 insertions(+), 325 deletions(-)

diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java
 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java
index c3cab5f410b..eeba82e280e 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.common.security.authentication;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import java.io.IOException;
@@ -41,4 +42,9 @@ public interface HadoopAuthenticator {
             return new HadoopSimpleAuthenticator((SimpleAuthenticationConfig) 
config);
         }
     }
+
+    static HadoopAuthenticator getHadoopAuthenticator(Configuration 
configuration) {
+        AuthenticationConfig authConfig = 
AuthenticationConfig.getKerberosConfig(configuration);
+        return getHadoopAuthenticator(authConfig);
+    }
 }
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index e1ae5dc60db..371481a2e91 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -141,6 +141,11 @@ under the License.
             <groupId>commons-validator</groupId>
             <artifactId>commons-validator</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.google.re2j</groupId>
+            <artifactId>re2j</artifactId>
+        </dependency>
+
         <!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
         <dependency>
             <groupId>com.google.code.gson</groupId>
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java
index 4755159379f..a68d9099558 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java
@@ -23,8 +23,9 @@ import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.PrintableMap;
-import org.apache.doris.datasource.property.S3ClientBEProperties;
 import org.apache.doris.datasource.property.constants.BosProperties;
+import org.apache.doris.datasource.property.storage.ObjectStorageProperties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
 import org.apache.doris.fs.PersistentFileSystem;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.thrift.TFileType;
@@ -80,8 +81,10 @@ public class BrokerDesc extends StorageDesc implements 
Writable {
         } else {
             this.storageType = StorageBackend.StorageType.BROKER;
         }
-        
this.properties.putAll(S3ClientBEProperties.getBeFSProperties(this.properties));
-        this.convertedToS3 = BosProperties.tryConvertBosToS3(this.properties, 
this.storageType);
+        this.storageProperties = 
StorageProperties.createStorageProperties(properties);
+        // we should use storage properties?
+        this.properties.putAll(storageProperties.getBackendConfigProperties());
+        this.convertedToS3 = 
ObjectStorageProperties.class.isInstance(this.storageProperties);
         if (this.convertedToS3) {
             this.storageType = StorageBackend.StorageType.S3;
         }
@@ -93,9 +96,11 @@ public class BrokerDesc extends StorageDesc implements 
Writable {
         if (properties != null) {
             this.properties.putAll(properties);
         }
+        this.storageProperties = 
StorageProperties.createStorageProperties(properties);
+        // we should use storage properties?
+        this.properties.putAll(storageProperties.getBackendConfigProperties());
         this.storageType = storageType;
-        
this.properties.putAll(S3ClientBEProperties.getBeFSProperties(this.properties));
-        this.convertedToS3 = BosProperties.tryConvertBosToS3(this.properties, 
this.storageType);
+        this.convertedToS3 = 
ObjectStorageProperties.class.isInstance(this.storageProperties);
         if (this.convertedToS3) {
             this.storageType = StorageBackend.StorageType.S3;
         }
@@ -159,6 +164,7 @@ public class BrokerDesc extends StorageDesc implements 
Writable {
                 LOG.warn("set to BROKER, because of exception", e);
             }
         }
+        this.storageProperties = 
StorageProperties.createStorageProperties(properties);
         storageType = st;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index d4a7b25ed5a..ff49c8a80cd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -34,7 +34,6 @@ import org.apache.doris.common.util.FileFormatConstants;
 import org.apache.doris.common.util.ParseUtil;
 import org.apache.doris.common.util.PrintableMap;
 import org.apache.doris.common.util.Util;
-import org.apache.doris.datasource.property.PropertyConverter;
 import org.apache.doris.datasource.property.constants.S3Properties;
 import org.apache.doris.thrift.TFileCompressType;
 import org.apache.doris.thrift.TFileFormatType;
@@ -61,7 +60,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 // For syntax select * from tbl INTO OUTFILE xxxx
 public class OutFileClause {
@@ -616,14 +614,6 @@ public class OutFileClause {
         if (this.fileFormatType == TFileFormatType.FORMAT_ORC) {
             getOrcProperties(processedPropKeys);
         }
-
-        if (processedPropKeys.size() != properties.size()) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("{} vs {}", processedPropKeys, properties);
-            }
-            throw new AnalysisException("Unknown properties: " + 
properties.keySet().stream()
-                    .filter(k -> 
!processedPropKeys.contains(k)).collect(Collectors.toList()));
-        }
     }
 
     /**
@@ -647,38 +637,7 @@ public class OutFileClause {
             return;
         }
 
-        Map<String, String> brokerProps = Maps.newHashMap();
-        for (Map.Entry<String, String> entry : properties.entrySet()) {
-            if (entry.getKey().startsWith(BROKER_PROP_PREFIX) && 
!entry.getKey().equals(PROP_BROKER_NAME)) {
-                
brokerProps.put(entry.getKey().substring(BROKER_PROP_PREFIX.length()), 
entry.getValue());
-                processedPropKeys.add(entry.getKey());
-            } else if 
(entry.getKey().toLowerCase().startsWith(S3Properties.S3_PREFIX)
-                    || 
entry.getKey().toUpperCase().startsWith(S3Properties.Env.PROPERTIES_PREFIX)) {
-                brokerProps.put(entry.getKey(), entry.getValue());
-                processedPropKeys.add(entry.getKey());
-            } else if (entry.getKey().contains(HdfsResource.HADOOP_FS_NAME)
-                    && storageType == StorageBackend.StorageType.HDFS) {
-                brokerProps.put(entry.getKey(), entry.getValue());
-                processedPropKeys.add(entry.getKey());
-            } else if ((entry.getKey().startsWith(HADOOP_FS_PROP_PREFIX)
-                    || entry.getKey().startsWith(HADOOP_PROP_PREFIX))
-                    && storageType == StorageBackend.StorageType.HDFS) {
-                brokerProps.put(entry.getKey(), entry.getValue());
-                processedPropKeys.add(entry.getKey());
-            }
-        }
-        if (storageType == StorageBackend.StorageType.S3) {
-            if (properties.containsKey(PropertyConverter.USE_PATH_STYLE)) {
-                brokerProps.put(PropertyConverter.USE_PATH_STYLE, 
properties.get(PropertyConverter.USE_PATH_STYLE));
-                processedPropKeys.add(PropertyConverter.USE_PATH_STYLE);
-            }
-            S3Properties.requiredS3Properties(brokerProps);
-        } else if (storageType == StorageBackend.StorageType.HDFS) {
-            if (!brokerProps.containsKey(HdfsResource.HADOOP_FS_NAME)) {
-                brokerProps.put(HdfsResource.HADOOP_FS_NAME, 
getFsName(filePath));
-            }
-        }
-        brokerDesc = new BrokerDesc(brokerName, storageType, brokerProps);
+        brokerDesc = new BrokerDesc(brokerName, storageType, properties);
     }
 
     public static String getFsName(String path) {
@@ -869,7 +828,7 @@ public class OutFileClause {
         sinkOptions.setWithBom(withBom);
 
         if (brokerDesc != null) {
-            sinkOptions.setBrokerProperties(brokerDesc.getProperties());
+            
sinkOptions.setBrokerProperties(brokerDesc.getStorageProperties().getBackendConfigProperties());
             // broker_addresses of sinkOptions will be set in Coordinator.
             // Because we need to choose the nearest broker with the result 
sink node.
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageDesc.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageDesc.java
index 902f5d24b91..28ebdff71b7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageDesc.java
@@ -17,7 +17,10 @@
 
 package org.apache.doris.analysis;
 
+import org.apache.doris.datasource.property.storage.StorageProperties;
+
 import com.google.gson.annotations.SerializedName;
+import lombok.Getter;
 
 import java.util.Map;
 
@@ -35,6 +38,9 @@ public class StorageDesc extends ResourceDesc {
     @SerializedName("st")
     protected StorageBackend.StorageType storageType;
 
+    @Getter
+    protected StorageProperties storageProperties;
+
     public StorageDesc() {
     }
 
@@ -42,6 +48,7 @@ public class StorageDesc extends ResourceDesc {
         this.name = name;
         this.storageType = storageType;
         this.properties = properties;
+        this.storageProperties = 
StorageProperties.createStorageProperties(properties);
     }
 
     public void setName(String name) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
index 040ab729a5f..d76147fca4c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
@@ -48,10 +48,9 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.property.storage.StorageProperties;
 import org.apache.doris.fs.FileSystemFactory;
-import org.apache.doris.fs.remote.AzureFileSystem;
 import org.apache.doris.fs.remote.RemoteFileSystem;
-import org.apache.doris.fs.remote.S3FileSystem;
 import org.apache.doris.persist.BarrierLog;
 import org.apache.doris.task.DirMoveTask;
 import org.apache.doris.task.DownloadTask;
@@ -212,8 +211,9 @@ public class BackupHandler extends MasterDaemon implements 
Writable {
                     "broker does not exist: " + stmt.getBrokerName());
         }
 
-        RemoteFileSystem fileSystem = 
FileSystemFactory.get(stmt.getBrokerName(), stmt.getStorageType(),
-                    stmt.getProperties());
+        StorageProperties storageProperties = 
StorageProperties.createStorageProperties(stmt.getProperties());
+
+        RemoteFileSystem fileSystem = FileSystemFactory.get(storageProperties);
         long repoId = env.getNextId();
         Repository repo = new Repository(repoId, stmt.getName(), 
stmt.isReadOnly(), stmt.getLocation(), fileSystem);
 
@@ -235,44 +235,28 @@ public class BackupHandler extends MasterDaemon 
implements Writable {
             if (repo == null) {
                 ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, 
"Repository does not exist");
             }
+            Map<String, String> allProperties = new 
HashMap<>(repo.getRemoteFileSystem().getProperties());
+            allProperties.putAll(stmt.getProperties());
+            StorageProperties newStorageProperties = 
StorageProperties.createStorageProperties(allProperties);
+            RemoteFileSystem fileSystem = 
FileSystemFactory.get(newStorageProperties);
 
-            if (repo.getRemoteFileSystem() instanceof S3FileSystem
-                    || repo.getRemoteFileSystem() instanceof AzureFileSystem) {
-                Map<String, String> oldProperties = new 
HashMap<>(stmt.getProperties());
-                Status status = 
repo.alterRepositoryS3Properties(oldProperties);
-                if (!status.ok()) {
-                    ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, 
status.getErrMsg());
-                }
-                RemoteFileSystem fileSystem = null;
-                if (repo.getRemoteFileSystem() instanceof S3FileSystem) {
-                    fileSystem = 
FileSystemFactory.get(repo.getRemoteFileSystem().getName(),
-                            StorageBackend.StorageType.S3, oldProperties);
-                } else if (repo.getRemoteFileSystem() instanceof 
AzureFileSystem) {
-                    fileSystem = 
FileSystemFactory.get(repo.getRemoteFileSystem().getName(),
-                            StorageBackend.StorageType.AZURE, oldProperties);
-                }
-
-                Repository newRepo = new Repository(repo.getId(), 
repo.getName(), repo.isReadOnly(),
-                        repo.getLocation(), fileSystem);
-                if (!newRepo.ping()) {
-                    LOG.warn("Failed to connect repository {}. msg: {}", 
repo.getName(), repo.getErrorMsg());
-                    ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
-                            "Repo can not ping with new s3 properties");
-                }
+            Repository newRepo = new Repository(repo.getId(), repo.getName(), 
repo.isReadOnly(),
+                    repo.getLocation(), fileSystem);
+            if (!newRepo.ping()) {
+                LOG.warn("Failed to connect repository {}. msg: {}", 
repo.getName(), repo.getErrorMsg());
+                ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
+                        "Repo can not ping with new storage properties");
+            }
 
-                Status st = repoMgr.alterRepo(newRepo, false /* not replay */);
-                if (!st.ok()) {
-                    ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
-                            "Failed to alter repository: " + st.getErrMsg());
-                }
-                for (AbstractJob job : getAllCurrentJobs()) {
-                    if (!job.isDone() && job.getRepoId() == repo.getId()) {
-                        job.updateRepo(newRepo);
-                    }
-                }
-            } else {
+            Status st = repoMgr.alterRepo(newRepo, false /* not replay */);
+            if (!st.ok()) {
                 ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
-                        "Only support alter s3 or azure repository");
+                        "Failed to alter repository: " + st.getErrMsg());
+            }
+            for (AbstractJob job : getAllCurrentJobs()) {
+                if (!job.isDone() && job.getRepoId() == repo.getId()) {
+                    job.updateRepo(newRepo);
+                }
             }
         } finally {
             seqlock.unlock();
@@ -285,6 +269,7 @@ public class BackupHandler extends MasterDaemon implements 
Writable {
     }
 
     // handle drop repository stmt
+    // todo when drop repository, we should check if there is any job running 
on it and should close fs
     public void dropRepository(String repoName) throws DdlException {
         tryLock();
         try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index b8c73584229..760103e846e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -39,7 +39,6 @@ import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.datasource.property.S3ClientBEProperties;
 import org.apache.doris.persist.BarrierLog;
 import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
@@ -384,7 +383,7 @@ public class BackupJob extends AbstractJob implements 
GsonPostProcessable {
                     continue;
                 }
                 ((UploadTask) task).updateBrokerProperties(
-                                
S3ClientBEProperties.getBeFSProperties(repo.getRemoteFileSystem().getProperties()));
+                        
repo.getRemoteFileSystem().getStorageProperties().getBackendConfigProperties());
                 AgentTaskQueue.updateTask(beId, TTaskType.UPLOAD, signature, 
task);
             }
             LOG.info("finished to update upload job properties. {}", this);
@@ -777,7 +776,7 @@ public class BackupJob extends AbstractJob implements 
GsonPostProcessable {
                 long signature = env.getNextId();
                 UploadTask task = new UploadTask(null, beId, signature, jobId, 
dbId, srcToDest,
                         brokers.get(0),
-                        
S3ClientBEProperties.getBeFSProperties(repo.getRemoteFileSystem().getProperties()),
+                        
repo.getRemoteFileSystem().getStorageProperties().getBackendConfigProperties(),
                         repo.getRemoteFileSystem().getStorageType(), 
repo.getLocation());
                 batchTask.addTask(task);
                 unfinishedTaskIds.put(signature, beId);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
index 1450ef9a034..ff3f011ffed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
@@ -30,6 +30,7 @@ import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.PrintableMap;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
 import org.apache.doris.fs.FileSystemFactory;
 import org.apache.doris.fs.PersistentFileSystem;
 import org.apache.doris.fs.remote.AzureFileSystem;
@@ -205,15 +206,8 @@ public class Repository implements Writable, 
GsonPostProcessable {
 
     @Override
     public void gsonPostProcess() {
-        StorageBackend.StorageType type = StorageBackend.StorageType.BROKER;
-        if 
(this.fileSystem.properties.containsKey(PersistentFileSystem.STORAGE_TYPE)) {
-            type = StorageBackend.StorageType.valueOf(
-                    
this.fileSystem.properties.get(PersistentFileSystem.STORAGE_TYPE));
-            
this.fileSystem.properties.remove(PersistentFileSystem.STORAGE_TYPE);
-        }
-        this.fileSystem = FileSystemFactory.get(this.fileSystem.getName(),
-                type,
-                this.fileSystem.getProperties());
+        StorageProperties storageProperties = 
StorageProperties.createStorageProperties(this.fileSystem.properties);
+        this.fileSystem = FileSystemFactory.get(storageProperties);
     }
 
     public long getId() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index e563192c584..773e13d1143 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -67,7 +67,6 @@ import org.apache.doris.common.util.DynamicPartitionUtil;
 import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.datasource.InternalCatalog;
-import org.apache.doris.datasource.property.S3ClientBEProperties;
 import org.apache.doris.persist.ColocatePersistInfo;
 import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
@@ -132,14 +131,14 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
     // CHECKSTYLE OFF
     public enum RestoreJobState {
         PENDING, // Job is newly created. Check and prepare meta in catalog. 
Create replica if necessary.
-                 // Waiting for replica creation finished synchronously, then 
sending snapshot tasks.
-                 // then transfer to CREATING.
+        // Waiting for replica creation finished synchronously, then sending 
snapshot tasks.
+        // then transfer to CREATING.
         CREATING, // Creating replica on BE. Transfer to SNAPSHOTING after all 
replicas created.
         SNAPSHOTING, // Waiting for snapshot finished. Than transfer to 
DOWNLOAD.
         DOWNLOAD, // Send download tasks.
         DOWNLOADING, // Waiting for download finished.
         COMMIT, // After download finished, all data is ready for taking 
effect.
-                    // Send movement tasks to BE, than transfer to COMMITTING
+        // Send movement tasks to BE, than transfer to COMMITTING
         COMMITTING, // wait all tasks finished. Transfer to FINISHED
         FINISHED,
         CANCELLED
@@ -231,9 +230,9 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
     }
 
     public RestoreJob(String label, String backupTs, long dbId, String dbName, 
BackupJobInfo jobInfo, boolean allowLoad,
-            ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, 
boolean reserveReplica,
-            boolean reserveColocate, boolean reserveDynamicPartitionEnable, 
boolean isBeingSynced,
-            boolean isCleanTables, boolean isCleanPartitions, boolean 
isAtomicRestore, Env env, long repoId) {
+                      ReplicaAllocation replicaAlloc, long timeoutMs, int 
metaVersion, boolean reserveReplica,
+                      boolean reserveColocate, boolean 
reserveDynamicPartitionEnable, boolean isBeingSynced,
+                      boolean isCleanTables, boolean isCleanPartitions, 
boolean isAtomicRestore, Env env, long repoId) {
         super(JobType.RESTORE, label, dbId, dbName, timeoutMs, env, repoId);
         this.backupTimestamp = backupTs;
         this.jobInfo = jobInfo;
@@ -262,10 +261,10 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
     }
 
     public RestoreJob(String label, String backupTs, long dbId, String dbName, 
BackupJobInfo jobInfo, boolean allowLoad,
-            ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, 
boolean reserveReplica,
-            boolean reserveColocate, boolean reserveDynamicPartitionEnable, 
boolean isBeingSynced,
-            boolean isCleanTables, boolean isCleanPartitions, boolean 
isAtomicRestore, Env env, long repoId,
-            BackupMeta backupMeta) {
+                      ReplicaAllocation replicaAlloc, long timeoutMs, int 
metaVersion, boolean reserveReplica,
+                      boolean reserveColocate, boolean 
reserveDynamicPartitionEnable, boolean isBeingSynced,
+                      boolean isCleanTables, boolean isCleanPartitions, 
boolean isAtomicRestore, Env env, long repoId,
+                      BackupMeta backupMeta) {
         this(label, backupTs, dbId, dbName, jobInfo, allowLoad, replicaAlloc, 
timeoutMs, metaVersion, reserveReplica,
                 reserveColocate, reserveDynamicPartitionEnable, isBeingSynced, 
isCleanTables, isCleanPartitions,
                 isAtomicRestore, env, repoId);
@@ -317,7 +316,7 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
             Preconditions.checkState(task.getTabletId() == removedTabletId, 
removedTabletId);
             if (LOG.isDebugEnabled()) {
                 LOG.debug("get finished snapshot info: {}, unfinished tasks 
num: {}, remove result: {}. {}",
-                          info, unfinishedSignatureToId.size(), this, 
removedTabletId);
+                        info, unfinishedSignatureToId.size(), this, 
removedTabletId);
             }
             return true;
         }
@@ -336,7 +335,7 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
             SnapshotInfo info = snapshotInfos.get(tabletId, 
task.getBackendId());
             if (info == null) {
                 LOG.warn("failed to find snapshot infos of tablet {} in be {}, 
{}",
-                          tabletId, task.getBackendId(), this);
+                        tabletId, task.getBackendId(), this);
                 return false;
             }
         }
@@ -427,7 +426,7 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
                     continue;
                 }
                 ((DownloadTask) task).updateBrokerProperties(
-                        
S3ClientBEProperties.getBeFSProperties(repo.getRemoteFileSystem().getProperties()));
+                        
repo.getRemoteFileSystem().getStorageProperties().getBackendConfigProperties());
                 AgentTaskQueue.updateTask(beId, TTaskType.DOWNLOAD, signature, 
task);
             }
             LOG.info("finished to update download job properties. {}", this);
@@ -544,17 +543,17 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
      * Restore rules as follow:
      * OlapTable
      * A. Table already exist
-     *      A1. Partition already exist, generate file mapping
-     *      A2. Partition does not exist, add restored partition to the table.
-     *          Reset all index/tablet/replica id, and create replica on BE 
outside the table lock.
+     * A1. Partition already exist, generate file mapping
+     * A2. Partition does not exist, add restored partition to the table.
+     * Reset all index/tablet/replica id, and create replica on BE outside the 
table lock.
      * B. Table does not exist
-     *      B1. Add table to the db, reset all table/index/tablet/replica id,
-     *          and create replica on BE outside the db lock.
+     * B1. Add table to the db, reset all table/index/tablet/replica id,
+     * and create replica on BE outside the db lock.
      * View
-     *      * A. View already exist. The same signature is allowed.
-     *      * B. View does not exist.
+     * * A. View already exist. The same signature is allowed.
+     * * B. View does not exist.
      * All newly created table/partition/index/tablet/replica should be saved 
for rolling back.
-     *
+     * <p>
      * Step:
      * 1. download and deserialize backup meta from repository.
      * 2. set all existing restored table's state to RESTORE.
@@ -727,7 +726,7 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
                         if (!localTblSignature.equals(remoteTblSignature)) {
                             String alias = 
jobInfo.getAliasByOriginNameIfSet(tableName);
                             LOG.warn("Table {} already exists but with 
different schema, "
-                                    + "local table: {}, remote table: {}",
+                                            + "local table: {}, remote table: 
{}",
                                     alias, localTblSignature, 
remoteTblSignature);
                             status = new Status(ErrCode.COMMON_ERROR, "Table "
                                     + alias + " already exist but with 
different schema");
@@ -788,7 +787,7 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
                                         if (reserveReplica) {
                                             PartitionInfo remotePartInfo = 
remoteOlapTbl.getPartitionInfo();
                                             restoreReplicaAlloc = 
remotePartInfo.getReplicaAllocation(
-                                                remotePartition.getId());
+                                                    remotePartition.getId());
                                         }
                                         Partition restorePart = 
resetPartitionForRestore(localOlapTbl, remoteOlapTbl,
                                                 partitionName,
@@ -837,7 +836,7 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
 
                     // Reset properties to correct values.
                     
remoteOlapTbl.resetPropertiesForRestore(reserveDynamicPartitionEnable, 
reserveReplica,
-                                                            replicaAlloc, 
isBeingSynced);
+                            replicaAlloc, isBeingSynced);
 
                     // DO NOT set remote table's new name here, cause we will 
still need the origin name later
                     // 
remoteOlapTbl.setName(jobInfo.getAliasByOriginNameIfSet(tblInfo.name));
@@ -1019,7 +1018,7 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
         if (!(ok && createReplicaTasksLatch.getStatus().ok())) {
             // only show at most 10 results
             List<String> subList = 
createReplicaTasksLatch.getLeftMarks().stream().limit(10)
-                    .map(item -> "(backendId = " + item.getKey() + ", tabletId 
= "  + item.getValue() + ")")
+                    .map(item -> "(backendId = " + item.getKey() + ", tabletId 
= " + item.getValue() + ")")
                     .collect(Collectors.toList());
             String idStr = Joiner.on(", ").join(subList);
             String reason = "TIMEDOUT";
@@ -1146,7 +1145,8 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
                     if (schemaHash == -1) {
                         return new Status(ErrCode.COMMON_ERROR, String.format(
                                 "schema hash of local index %d is not found, 
remote table=%d, remote index=%d, "
-                                + "local table=%d, local index=%d", 
localIndexId, remoteOlapTbl.getId(), index.getId(),
+                                        + "local table=%d, local index=%d",
+                                localIndexId, remoteOlapTbl.getId(), 
index.getId(),
                                 localOlapTbl.getId(), localIndexId));
                     }
 
@@ -1154,8 +1154,8 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
                     List<Tablet> remoteTablets = index.getTablets();
                     if (localTablets.size() != remoteTablets.size()) {
                         LOG.warn("skip bind replicas because the size of local 
tablet {} is not equals to "
-                                + "the remote {}, is_atomic_restore=true, 
remote table={}, remote index={}, "
-                                + "local table={}, local index={}", 
localTablets.size(), remoteTablets.size(),
+                                        + "the remote {}, 
is_atomic_restore=true, remote table={}, remote index={}, "
+                                        + "local table={}, local index={}", 
localTablets.size(), remoteTablets.size(),
                                 remoteOlapTbl.getId(), index.getId(), 
localOlapTbl.getId(), localIndexId);
                         continue;
                     }
@@ -1165,9 +1165,11 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
                         List<Replica> localReplicas = 
localTablet.getReplicas();
                         List<Replica> remoteReplicas = 
remoteTablet.getReplicas();
                         if (localReplicas.size() != remoteReplicas.size()) {
-                            LOG.warn("skip bind replicas because the size of 
local replicas {} is not equals to "
-                                    + "the remote {}, is_atomic_restore=true, 
remote table={}, remote index={}, "
-                                    + "local table={}, local index={}, local 
tablet={}, remote tablet={}",
+                            LOG.warn("skip bind replicas because the size of 
local replicas {} is not"
+                                            + " equals to "
+                                            + "the remote {}, 
is_atomic_restore=true, remote table={}, remote index"
+                                            + "={}, "
+                                            + "local table={}, local index={}, 
local tablet={}, remote tablet={}",
                                     localReplicas.size(), 
remoteReplicas.size(), remoteOlapTbl.getId(),
                                     index.getId(), localOlapTbl.getId(), 
localIndexId, localTablet.getId(),
                                     remoteTablet.getId());
@@ -1284,8 +1286,9 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
     }
 
     private boolean genFileMappingWhenBackupReplicasEqual(PartitionInfo 
localPartInfo, Partition localPartition,
-            Table localTbl, BackupPartitionInfo backupPartInfo, String 
partitionName, BackupOlapTableInfo tblInfo,
-            ReplicaAllocation remoteReplicaAlloc) {
+                                                          Table localTbl, 
BackupPartitionInfo backupPartInfo,
+                                                          String 
partitionName, BackupOlapTableInfo tblInfo,
+                                                          ReplicaAllocation 
remoteReplicaAlloc) {
         short restoreReplicaNum;
         short localReplicaNum = 
localPartInfo.getReplicaAllocation(localPartition.getId()).getTotalReplicaNum();
         if (!reserveReplica) {
@@ -1315,7 +1318,7 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
     }
 
     private void createReplicas(Database db, AgentBatchTask batchTask, 
OlapTable localTbl, Partition restorePart,
-            Map<Long, TabletRef> tabletBases) {
+                                Map<Long, TabletRef> tabletBases) {
         Set<String> bfColumns = localTbl.getCopiedBfColumns();
         double bfFpp = localTbl.getBfFpp();
 
@@ -1331,7 +1334,7 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
         for (MaterializedIndex restoredIdx : 
restorePart.getMaterializedIndices(IndexExtState.VISIBLE)) {
             MaterializedIndexMeta indexMeta = 
localTbl.getIndexMetaByIndexId(restoredIdx.getId());
             List<Index> indexes = restoredIdx.getId() == 
localTbl.getBaseIndexId()
-                                    ? localTbl.getCopiedIndexes() : null;
+                    ? localTbl.getCopiedIndexes() : null;
             List<Integer> clusterKeyUids = null;
             if (indexMeta.getIndexId() == localTbl.getBaseIndexId() || 
localTbl.isShadowIndex(indexMeta.getIndexId())) {
                 clusterKeyUids = 
OlapTable.getClusterKeyUids(indexMeta.getSchema());
@@ -1470,12 +1473,13 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
 
     // files in repo to files in local
     private void genFileMapping(OlapTable localTbl, Partition localPartition, 
Long remoteTblId,
-            BackupPartitionInfo backupPartInfo, boolean overwrite) {
+                                BackupPartitionInfo backupPartInfo, boolean 
overwrite) {
         genFileMapping(localTbl, localPartition, remoteTblId, backupPartInfo, 
overwrite, null);
     }
 
     private void genFileMapping(OlapTable localTbl, Partition localPartition, 
Long remoteTblId,
-            BackupPartitionInfo backupPartInfo, boolean overwrite, Map<Long, 
TabletRef> tabletBases) {
+                                BackupPartitionInfo backupPartInfo, boolean 
overwrite,
+                                Map<Long, TabletRef> tabletBases) {
         for (MaterializedIndex localIdx : 
localPartition.getMaterializedIndices(IndexExtState.VISIBLE)) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("get index id: {}, index name: {}", localIdx.getId(),
@@ -1807,7 +1811,7 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
                         long signature = env.getNextId();
                         DownloadTask task = new DownloadTask(null, beId, 
signature, jobId, dbId, srcToDest,
                                 brokerAddrs.get(0),
-                                
S3ClientBEProperties.getBeFSProperties(repo.getRemoteFileSystem().getProperties()),
+                                
repo.getRemoteFileSystem().getStorageProperties().getBackendConfigProperties(),
                                 repo.getRemoteFileSystem().getStorageType(), 
repo.getLocation());
                         batchTask.addTask(task);
                         unfinishedSignatureToId.put(signature, beId);
@@ -2416,7 +2420,7 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
             colocatePersistInfos.clear();
 
             LOG.info("finished to cancel restore job. current state: {}. is 
replay: {}. {}",
-                     curState.name(), isReplay, this);
+                    curState.name(), isReplay, this);
 
             // Send release snapshot tasks after log restore job, so that the 
snapshot won't be released
             // before the cancelled restore job is persisted.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java
index 26747e826fd..0a1518567da 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java
@@ -24,6 +24,7 @@ import 
org.apache.doris.common.credentials.CloudCredentialWithEndpoint;
 import org.apache.doris.common.proc.BaseProcResult;
 import org.apache.doris.common.util.PrintableMap;
 import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
 import org.apache.doris.fs.remote.S3FileSystem;
 
 import com.google.common.base.Preconditions;
@@ -118,8 +119,11 @@ public class S3Resource extends Resource {
     }
 
     private static void pingS3(CloudCredentialWithEndpoint credential, String 
bucketName, String rootPath,
-            Map<String, String> properties) throws DdlException {
-        S3FileSystem fileSystem = new S3FileSystem(properties);
+                               Map<String, String> properties) throws 
DdlException {
+        org.apache.doris.datasource.property.storage.S3Properties s3params =
+                (org.apache.doris.datasource.property.storage.S3Properties) 
StorageProperties
+                        .createStorageProperties(properties);
+        S3FileSystem fileSystem = new S3FileSystem(s3params);
         String testFile = "s3://" + bucketName + "/" + rootPath + 
"/test-object-valid.txt";
         String content = "doris will be better";
         if (FeConstants.runningUnitTest) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
index c5a2803b848..79dcad5ee6f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
@@ -86,8 +86,8 @@ public class BrokerUtil {
             throws UserException {
         List<RemoteFile> rfiles = new ArrayList<>();
         try {
-            RemoteFileSystem fileSystem = FileSystemFactory.get(
-                    brokerDesc.getName(), brokerDesc.getStorageType(), 
brokerDesc.getProperties());
+            //fixme do we need new file system every time?
+            RemoteFileSystem fileSystem = 
FileSystemFactory.get(brokerDesc.getStorageProperties());
             Status st = fileSystem.globList(path, rfiles, false);
             if (!st.ok()) {
                 throw new UserException(st.getErrMsg());
@@ -108,8 +108,7 @@ public class BrokerUtil {
     }
 
     public static void deleteDirectoryWithFileSystem(String path, BrokerDesc 
brokerDesc) throws UserException {
-        RemoteFileSystem fileSystem = FileSystemFactory.get(
-                brokerDesc.getName(), brokerDesc.getStorageType(), 
brokerDesc.getProperties());
+        RemoteFileSystem fileSystem = 
FileSystemFactory.get(brokerDesc.getStorageProperties());
         Status st = fileSystem.deleteDirectory(path);
         if (!st.ok()) {
             throw new UserException(brokerDesc.getName() +  " delete directory 
exception. path="
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectionProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectionProperties.java
index 2722e0d7c05..bb66d4b6f22 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectionProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectionProperties.java
@@ -21,13 +21,18 @@ import org.apache.doris.common.CatalogConfigFileUtils;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
+import lombok.Getter;
+import lombok.Setter;
 import org.apache.hadoop.conf.Configuration;
 
 import java.lang.reflect.Field;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class ConnectionProperties {
+public abstract class ConnectionProperties {
+    @Getter
+    @Setter
     protected Map<String, String> origProps;
 
     protected ConnectionProperties(Map<String, String> origProps) {
@@ -39,6 +44,7 @@ public class ConnectionProperties {
         Map<String, String> allProps = 
loadConfigFromFile(getResourceConfigPropName());
         // 2. overwrite result properties with original properties
         allProps.putAll(origProps);
+        Map<String, String> matchParams = new HashMap<>();
         // 3. set fields from resultProps
         List<Field> supportedProps = 
PropertyUtils.getConnectorProperties(this.getClass());
         for (Field field : supportedProps) {
@@ -49,6 +55,7 @@ public class ConnectionProperties {
                 if (allProps.containsKey(name)) {
                     try {
                         field.set(this, allProps.get(name));
+                        matchParams.put(name, allProps.get(name));
                     } catch (IllegalAccessException e) {
                         throw new RuntimeException("Failed to set property " + 
name + ", " + e.getMessage(), e);
                     }
@@ -58,6 +65,7 @@ public class ConnectionProperties {
         }
         // 3. check properties
         checkRequiredProperties();
+        setOrigProps(matchParams);
     }
 
     // Some properties may be loaded from file
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/connection/PaimonConnectionProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/connection/PaimonConnectionProperties.java
index b9d429f8a1a..7aa6d12c684 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/connection/PaimonConnectionProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/connection/PaimonConnectionProperties.java
@@ -20,7 +20,6 @@ package org.apache.doris.datasource.property.connection;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.CatalogProperty;
 import org.apache.doris.datasource.property.metastore.AliyunDLFProperties;
-import org.apache.doris.datasource.property.metastore.HMSProperties;
 import org.apache.doris.datasource.property.metastore.MetastoreProperties;
 import org.apache.doris.datasource.property.storage.HDFSProperties;
 import org.apache.doris.datasource.property.storage.S3Properties;
@@ -64,9 +63,9 @@ public class PaimonConnectionProperties {
         switch (metaProps.getType()) {
             case HMS:
                 options.set("metastore", "hive");
-                HMSProperties hmsProperties = (HMSProperties) metaProps;
+                //.HMSProperties hmsProperties = (HMSProperties) metaProps;
                 // TODO we need add all metastore parameters to paimon options?
-                hmsProperties.toPaimonOptionsAndConf(options);
+                // hmsProperties.toPaimonOptionsAndConf(options);
                 break;
             case DLF:
                 options.set("metastore", "hive");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractObjectStorageProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractObjectStorageProperties.java
index c5a92be9bf6..b9d84d2a7c9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractObjectStorageProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractObjectStorageProperties.java
@@ -66,7 +66,7 @@ public abstract class AbstractObjectStorageProperties extends 
StorageProperties
      */
     @Setter
     @Getter
-    @ConnectorProperty(names = {"usePathStyle"}, required = false,
+    @ConnectorProperty(names = {"usePathStyle", "s3.path-style-access"}, 
required = false,
             description = "Whether to use path style URL for the storage.")
     protected boolean usePathStyle = false;
 
@@ -126,4 +126,11 @@ public abstract class AbstractObjectStorageProperties 
extends StorageProperties
                 String.valueOf(getMaxConnections()), 
String.valueOf(getRequestTimeoutS()),
                 String.valueOf(getConnectionTimeoutS()), 
String.valueOf(isUsePathStyle()));
     }
+
+    @Override
+    public Map<String, String> getBackendConfigProperties() {
+        Map<String, String> config = new HashMap<>();
+        toNativeS3Configuration(config);
+        return config;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
index 2db0736cb07..1791ba67a51 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
@@ -20,6 +20,7 @@ package org.apache.doris.datasource.property.storage;
 import org.apache.doris.datasource.property.ConnectorProperty;
 
 import com.google.common.base.Strings;
+import org.apache.hadoop.conf.Configuration;
 
 import java.util.Map;
 import java.util.regex.Matcher;
@@ -50,13 +51,19 @@ public class COSProperties extends 
AbstractObjectStorageProperties {
         super(Type.COS, origProps);
     }
 
+    protected static boolean guessIsMe(Map<String, String> origProps) {
+        return origProps.containsKey("cos.access_key");
+    }
+
     @Override
-    public void toHadoopConfiguration(Map<String, String> config) {
-        config.put("fs.cosn.bucket.region", getRegion());
-        config.put("fs.cos.endpoint", cosEndpoint);
-        config.put("fs.cosn.userinfo.secretId", cosAccessKey);
-        config.put("fs.cosn.userinfo.secretKey", cosSecretKey);
-        config.put("fs.cosn.impl", "org.apache.hadoop.fs.CosFileSystem");
+    public Configuration getHadoopConfiguration() {
+        Configuration conf = new Configuration(false);
+        conf.set("fs.cosn.bucket.region", getRegion());
+        conf.set("fs.cos.endpoint", cosEndpoint);
+        conf.set("fs.cosn.userinfo.secretId", cosAccessKey);
+        conf.set("fs.cosn.userinfo.secretKey", cosSecretKey);
+        conf.set("fs.cosn.impl", "org.apache.hadoop.fs.CosFileSystem");
+        return conf;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HDFSProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HDFSProperties.java
index f4c684183ca..0b30b3f59b7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HDFSProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HDFSProperties.java
@@ -20,8 +20,10 @@ package org.apache.doris.datasource.property.storage;
 import org.apache.doris.datasource.property.ConnectorProperty;
 
 import com.google.common.base.Strings;
+import org.apache.commons.collections.MapUtils;
 import org.apache.hadoop.conf.Configuration;
 
+import java.util.HashMap;
 import java.util.Map;
 
 public class HDFSProperties extends StorageProperties {
@@ -57,8 +59,33 @@ public class HDFSProperties extends StorageProperties {
             description = "Whether to enable the impersonation of HDFS.")
     private boolean hdfsImpersonationEnabled = false;
 
+    /**
+     * The final HDFS configuration map that determines the effective settings.
+     * Priority rules:
+     * 1. If a key exists in `overrideConfig` (user-provided settings), its 
value takes precedence.
+     * 2. If a key is not present in `overrideConfig`, the value from 
`hdfs-site.xml` or `core-site.xml` is used.
+     * 3. This map should be used to read the resolved HDFS configuration, 
ensuring the correct precedence is applied.
+     */
+    Map<String, String> finalHdfsConfig;
+
     public HDFSProperties(Map<String, String> origProps) {
         super(Type.HDFS, origProps);
+        // to be care     setOrigProps(matchParams);
+        loadFinalHdfsConfig(origProps);
+    }
+
+    private void loadFinalHdfsConfig(Map<String, String> origProps) {
+        if (MapUtils.isEmpty(origProps)) {
+            return;
+        }
+        finalHdfsConfig = new HashMap<>();
+        Configuration configuration = new Configuration();
+        origProps.forEach((k, v) -> {
+            if (null != configuration.getTrimmed(k)) {
+                finalHdfsConfig.put(k, v);
+            }
+        });
+
     }
 
     @Override
@@ -88,13 +115,48 @@ public class HDFSProperties extends StorageProperties {
     public void toHadoopConfiguration(Configuration conf) {
         Map<String, String> allProps = 
loadConfigFromFile(getResourceConfigPropName());
         allProps.forEach(conf::set);
-        conf.set("hdfs.authentication.type", hdfsAuthenticationType);
+        if (MapUtils.isNotEmpty(finalHdfsConfig)) {
+            finalHdfsConfig.forEach(conf::set);
+        }
+        //todo waiting be support should use new params
+        conf.set("hdfs.security.authentication", hdfsAuthenticationType);
+        if ("kerberos".equalsIgnoreCase(hdfsAuthenticationType)) {
+            conf.set("hadoop.kerberos.principal", hdfsKerberosPrincipal);
+            conf.set("hadoop.kerberos.keytab", hdfsKerberosKeytab);
+        }
+        if (!Strings.isNullOrEmpty(hadoopUsername)) {
+            conf.set("hadoop.username", hadoopUsername);
+        }
+    }
+
+    public Configuration getHadoopConfiguration() {
+        Configuration conf = new Configuration(false);
+        Map<String, String> allProps = 
loadConfigFromFile(getResourceConfigPropName());
+        allProps.forEach(conf::set);
+        if (MapUtils.isNotEmpty(finalHdfsConfig)) {
+            finalHdfsConfig.forEach(conf::set);
+        }
+        conf.set("hdfs.security.authentication", hdfsAuthenticationType);
         if ("kerberos".equalsIgnoreCase(hdfsAuthenticationType)) {
-            conf.set("hdfs.authentication.kerberos.principal", 
hdfsKerberosPrincipal);
-            conf.set("hdfs.authentication.kerberos.keytab", 
hdfsKerberosKeytab);
+            conf.set("hadoop.kerberos.principal", hdfsKerberosPrincipal);
+            conf.set("hadoop.kerberos.keytab", hdfsKerberosKeytab);
         }
         if (!Strings.isNullOrEmpty(hadoopUsername)) {
             conf.set("hadoop.username", hadoopUsername);
         }
+
+        return conf;
+    }
+
+    //fixme be should send use input params
+    @Override
+    public Map<String, String> getBackendConfigProperties() {
+        Configuration configuration = getHadoopConfiguration();
+        Map<String, String> backendConfigProperties = new HashMap<>();
+        for (Map.Entry<String, String> entry : configuration) {
+            backendConfigProperties.put(entry.getKey(), entry.getValue());
+        }
+
+        return backendConfigProperties;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
index 2e36386fb64..1cdb4a83139 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
@@ -20,6 +20,7 @@ package org.apache.doris.datasource.property.storage;
 import org.apache.doris.datasource.property.ConnectorProperty;
 
 import com.google.common.base.Strings;
+import org.apache.hadoop.conf.Configuration;
 
 import java.util.Map;
 import java.util.regex.Matcher;
@@ -44,13 +45,19 @@ public class OBSProperties extends 
AbstractObjectStorageProperties {
         // Initialize fields from origProps
     }
 
+    protected static boolean guessIsMe(Map<String, String> origProps) {
+        return origProps.containsKey("obs.access_key");
+    }
+
+
     @Override
-    public void toHadoopConfiguration(Map<String, String> config) {
-        config.put("fs.obs.endpoint", obsEndpoint);
-        config.put("fs.obs.access.key", obsAccessKey);
-        config.put("fs.obs.secret.key", obsSecretKey);
-        config.put("fs.obs.impl", "org.apache.hadoop.fs.obs.OBSFileSystem");
-        //set other k v if nessesary
+    public Configuration getHadoopConfiguration() {
+        Configuration conf = new Configuration(false);
+        conf.set("fs.obs.endpoint", obsEndpoint);
+        conf.set("fs.obs.access.key", obsAccessKey);
+        conf.set("fs.obs.secret.key", obsSecretKey);
+        conf.set("fs.obs.impl", "org.apache.hadoop.fs.obs.OBSFileSystem");
+        return conf;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
index 1498e4e7b1a..712a1ab61e5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
@@ -20,6 +20,7 @@ package org.apache.doris.datasource.property.storage;
 import org.apache.doris.datasource.property.ConnectorProperty;
 
 import com.google.common.base.Strings;
+import org.apache.hadoop.conf.Configuration;
 
 import java.util.Map;
 import java.util.regex.Matcher;
@@ -42,12 +43,18 @@ public class OSSProperties extends 
AbstractObjectStorageProperties {
         super(Type.OSS, origProps);
     }
 
+    protected static boolean guessIsMe(Map<String, String> origProps) {
+        return origProps.containsKey("oss.access_key");
+    }
+
     @Override
-    public void toHadoopConfiguration(Map<String, String> config) {
-        config.put("fs.oss.endpoint", endpoint);
-        config.put("fs.oss.accessKeyId", accessKey);
-        config.put("fs.oss.accessKeySecret", secretKey);
-        config.put("fs.oss.impl", 
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
+    public Configuration getHadoopConfiguration() {
+        Configuration conf = new Configuration(false);
+        conf.set("fs.oss.endpoint", endpoint);
+        conf.set("fs.oss.accessKeyId", accessKey);
+        conf.set("fs.oss.accessKeySecret", secretKey);
+        conf.set("fs.oss.impl", 
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
+        return conf;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/ObjectStorageProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/ObjectStorageProperties.java
index a05d606d6da..2465fbf6192 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/ObjectStorageProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/ObjectStorageProperties.java
@@ -17,6 +17,8 @@
 
 package org.apache.doris.datasource.property.storage;
 
+import org.apache.hadoop.conf.Configuration;
+
 import java.util.Map;
 
 /**
@@ -34,7 +36,7 @@ public interface ObjectStorageProperties {
      * @param config a map to populate with the HDFS-compatible configuration 
parameters.
      *               These parameters will be used by Hadoop clients to 
connect to the object storage system.
      */
-    void toHadoopConfiguration(Map<String, String> config);
+    Configuration getHadoopConfiguration();
 
     /**
      * Converts the object storage properties to a configuration map 
compatible with the
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
index 4b1eb4e0012..ad4fcd2eb81 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
@@ -23,9 +23,11 @@ import 
org.apache.doris.datasource.property.metastore.AliyunDLFProperties;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.paimon.options.Options;
 
 import java.lang.reflect.Field;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -49,11 +51,6 @@ public class S3Properties extends 
AbstractObjectStorageProperties {
             description = "The secret key of S3.")
     protected String s3SecretKey = "";
 
-    @ConnectorProperty(names = {"use_path_style",
-            "s3.path-style-access"},
-            required = false,
-            description = "Whether to use path style access.")
-    protected String usePathStyle = "false";
 
     @ConnectorProperty(names = {"s3.connection.maximum",
             "AWS_MAX_CONNECTIONS"},
@@ -150,25 +147,34 @@ public class S3Properties extends 
AbstractObjectStorageProperties {
         catalogProps.put("s3.access-key-id", s3AccessKey);
         catalogProps.put("s3.secret-access-key", s3SecretKey);
         catalogProps.put("client.region", s3Region);
-        catalogProps.put("s3.path-style-access", usePathStyle);
+        catalogProps.put("s3.path-style-access", 
Boolean.toString(usePathStyle));
     }
 
     @Override
-    public void toHadoopConfiguration(Map<String, String> config) {
-        config.put("fs.s3a.access.key", s3AccessKey);  // AWS Access Key
-        config.put("fs.s3a.secret.key", s3SecretKey);  // AWS Secret Key
-        config.put("fs.s3a.endpoint", s3Endpoint);
-        config.put("fs.s3a.region", s3Region);
-        config.put("fs.s3a.connection.maximum", 
String.valueOf(s3ConnectionMaximum));
-        config.put("fs.s3a.connection.timeout", 
String.valueOf(s3ConnectionRequestTimeoutS));
-        config.put("fs.s3a.request.timeout", 
String.valueOf(s3ConnectionTimeoutS));
-        config.put("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
+    public Configuration getHadoopConfiguration() {
+        Configuration conf = new Configuration(false);
+        conf.set("fs.s3a.access.key", s3AccessKey);
+        conf.set("fs.s3a.secret.key", s3SecretKey);
+        conf.set("fs.s3a.endpoint", s3Endpoint);
+        conf.set("fs.s3a.region", s3Region);
+        conf.set("fs.s3a.connection.maximum", 
String.valueOf(s3ConnectionMaximum));
+        conf.set("fs.s3a.connection.timeout", 
String.valueOf(s3ConnectionRequestTimeoutS));
+        conf.set("fs.s3a.request.timeout", 
String.valueOf(s3ConnectionTimeoutS));
+        conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
+        return conf;
     }
 
     @Override
     public void toNativeS3Configuration(Map<String, String> config) {
         Map<String, String> awsS3Properties = 
generateAWSS3Properties(s3Endpoint, s3Region, s3AccessKey, s3SecretKey,
-                s3ConnectionMaximum, s3ConnectionRequestTimeoutS, 
s3ConnectionTimeoutS, usePathStyle);
+                s3ConnectionMaximum, s3ConnectionRequestTimeoutS, 
s3ConnectionTimeoutS, String.valueOf(usePathStyle));
         config.putAll(awsS3Properties);
     }
+
+    @Override
+    public Map<String, String> getBackendConfigProperties() {
+        Map<String, String> config = new HashMap<>();
+        toNativeS3Configuration(config);
+        return config;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
index 8db195246e4..8fdc486b7e8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
@@ -28,7 +28,7 @@ import java.lang.reflect.Field;
 import java.util.List;
 import java.util.Map;
 
-public class StorageProperties extends ConnectionProperties {
+public abstract class StorageProperties extends ConnectionProperties {
 
     public static final String FS_HDFS_SUPPORT = "fs.hdfs.support";
     public static final String FS_S3_SUPPORT = "fs.s3.support";
@@ -47,6 +47,8 @@ public class StorageProperties extends ConnectionProperties {
         UNKNOWN
     }
 
+    public abstract Map<String, String> getBackendConfigProperties();
+
     @Getter
     protected Type type;
 
@@ -102,6 +104,35 @@ public class StorageProperties extends 
ConnectionProperties {
         return storageProperties;
     }
 
+    public static StorageProperties createStorageProperties(Map<String, 
String> origProps) {
+        StorageProperties storageProperties = null;
+        // 1. parse the storage properties by user specified fs.xxx.support 
properties
+        if (isFsSupport(origProps, FS_HDFS_SUPPORT)) {
+            storageProperties = new HDFSProperties(origProps);
+        }
+
+        if (isFsSupport(origProps, FS_S3_SUPPORT) || 
S3Properties.guessIsMe(origProps)) {
+            storageProperties = new S3Properties(origProps);
+        }
+        if (isFsSupport(origProps, FS_OSS_SUPPORT) || 
OSSProperties.guessIsMe(origProps)) {
+            storageProperties = new OSSProperties(origProps);
+        }
+        if (isFsSupport(origProps, FS_OBS_SUPPORT) || 
OBSProperties.guessIsMe(origProps)) {
+            storageProperties = new OBSProperties(origProps);
+        }
+        if (isFsSupport(origProps, FS_COS_SUPPORT) || 
COSProperties.guessIsMe(origProps)) {
+            storageProperties = new COSProperties(origProps);
+        }
+        if (null == storageProperties) {
+            throw new RuntimeException("not support this fs");
+        }
+        storageProperties.normalizedAndCheckProps();
+        //load from default file
+        return storageProperties;
+
+    }
+
+
     protected StorageProperties(Type type, Map<String, String> origProps) {
         super(origProps);
         this.type = type;
@@ -123,4 +154,5 @@ public class StorageProperties extends ConnectionProperties 
{
         }
         return false;
     }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageTypeMapper.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageTypeMapper.java
new file mode 100644
index 00000000000..4cff2a1a675
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageTypeMapper.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.storage;
+
+import org.apache.doris.fs.remote.RemoteFileSystem;
+import org.apache.doris.fs.remote.S3FileSystem;
+import org.apache.doris.fs.remote.dfs.DFSFileSystem;
+
+import java.util.Arrays;
+import java.util.function.Function;
+
+public enum StorageTypeMapper {
+    HDFS(HDFSProperties.class, DFSFileSystem::new),
+    OSS(OSSProperties.class, S3FileSystem::new),
+    OBS(OBSProperties.class, S3FileSystem::new),
+    COS(COSProperties.class, S3FileSystem::new),
+    S3(S3Properties.class, S3FileSystem::new);
+
+    private final Class<? extends StorageProperties> propClass;
+    private final Function<StorageProperties, RemoteFileSystem> factory;
+
+    <T extends StorageProperties> StorageTypeMapper(Class<T> propClass, 
Function<T, RemoteFileSystem> factory) {
+        this.propClass = propClass;
+        this.factory = (prop) -> factory.apply(propClass.cast(prop));
+    }
+
+    public static RemoteFileSystem create(StorageProperties prop) {
+        return Arrays.stream(values())
+                .filter(type -> type.propClass.isInstance(prop))
+                .findFirst()
+                .orElseThrow(() -> new RuntimeException("Unknown storage 
type"))
+                .factory.apply(prop);
+    }
+}
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
index fb23005f4ac..dd57e52ce4a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
@@ -18,14 +18,9 @@
 package org.apache.doris.fs;
 
 import org.apache.doris.analysis.StorageBackend;
-import org.apache.doris.datasource.property.constants.AzureProperties;
-import org.apache.doris.fs.remote.AzureFileSystem;
-import org.apache.doris.fs.remote.BrokerFileSystem;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+import org.apache.doris.datasource.property.storage.StorageTypeMapper;
 import org.apache.doris.fs.remote.RemoteFileSystem;
-import org.apache.doris.fs.remote.S3FileSystem;
-import org.apache.doris.fs.remote.dfs.DFSFileSystem;
-import org.apache.doris.fs.remote.dfs.JFSFileSystem;
-import org.apache.doris.fs.remote.dfs.OFSFileSystem;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -36,7 +31,7 @@ import java.util.Map;
 public class FileSystemFactory {
 
     public static RemoteFileSystem get(String name, StorageBackend.StorageType 
type, Map<String, String> properties) {
-        // TODO: rename StorageBackend.StorageType
+        /*// TODO: rename StorageBackend.StorageType
         if (type == StorageBackend.StorageType.S3) {
             if (AzureProperties.checkAzureProviderPropertyExist(properties)) {
                 return new AzureFileSystem(properties);
@@ -52,12 +47,18 @@ public class FileSystemFactory {
             return new BrokerFileSystem(name, properties);
         } else {
             throw new UnsupportedOperationException(type.toString() + "backend 
is not implemented");
-        }
+        }*/
+        return null;
     }
 
+    public static RemoteFileSystem get(StorageProperties storageProperties) {
+        return StorageTypeMapper.create(storageProperties);
+    }
+
+
     public static RemoteFileSystem getRemoteFileSystem(FileSystemType type, 
Map<String, String> properties,
                                                        String bindBrokerName) {
-        switch (type) {
+        /*switch (type) {
             case S3:
                 if 
(AzureProperties.checkAzureProviderPropertyExist(properties)) {
                     return new AzureFileSystem(properties);
@@ -76,7 +77,8 @@ public class FileSystemFactory {
                 return new AzureFileSystem(properties);
             default:
                 throw new IllegalStateException("Not supported file system 
type: " + type);
-        }
+        }*/
+        return null;
     }
 
     public static RemoteFileSystem getS3FileSystem(Map<String, String> 
properties) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/PersistentFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/PersistentFileSystem.java
index 9cb156036d9..5ae1255a190 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/PersistentFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/PersistentFileSystem.java
@@ -19,10 +19,12 @@ package org.apache.doris.fs;
 
 import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.common.io.Text;
+import org.apache.doris.datasource.property.storage.StorageProperties;
 import org.apache.doris.persist.gson.GsonPreProcessable;
 
 import com.google.common.collect.Maps;
 import com.google.gson.annotations.SerializedName;
+import lombok.Getter;
 
 import java.io.DataInput;
 import java.io.IOException;
@@ -39,6 +41,9 @@ public abstract class PersistentFileSystem implements 
FileSystem, GsonPreProcess
     public String name;
     public StorageBackend.StorageType type;
 
+    @Getter
+    protected StorageProperties storageProperties;
+
     public boolean needFullPath() {
         return type == StorageBackend.StorageType.S3
                     || type == StorageBackend.StorageType.OFS
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
index be53ffde2e0..5910ed80c9e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
@@ -22,9 +22,8 @@ import org.apache.doris.backup.Status;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.security.authentication.AuthenticationConfig;
 import org.apache.doris.common.security.authentication.HadoopAuthenticator;
-import org.apache.doris.datasource.property.PropertyConverter;
+import 
org.apache.doris.datasource.property.storage.AbstractObjectStorageProperties;
 import org.apache.doris.fs.obj.S3ObjStorage;
-import org.apache.doris.fs.remote.dfs.DFSFileSystem;
 
 import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.google.common.annotations.VisibleForTesting;
@@ -44,10 +43,17 @@ public class S3FileSystem extends ObjFileSystem {
 
     private static final Logger LOG = LogManager.getLogger(S3FileSystem.class);
     private HadoopAuthenticator authenticator = null;
+    private AbstractObjectStorageProperties s3Properties;
 
-    public S3FileSystem(Map<String, String> properties) {
-        super(StorageBackend.StorageType.S3.name(), 
StorageBackend.StorageType.S3, new S3ObjStorage(properties));
+
+    public S3FileSystem(AbstractObjectStorageProperties s3Properties) {
+
+        super(StorageBackend.StorageType.S3.name(), 
StorageBackend.StorageType.S3,
+                new S3ObjStorage(s3Properties.getOrigProps()));
+        this.s3Properties = s3Properties;
+        this.storageProperties = s3Properties;
         initFsProperties();
+
     }
 
     @VisibleForTesting
@@ -72,14 +78,8 @@ public class S3FileSystem extends ObjFileSystem {
                     throw new UserException("FileSystem is closed.");
                 }
                 if (dfsFileSystem == null) {
-                    Configuration conf = 
DFSFileSystem.getHdfsConf(ifNotSetFallbackToSimpleAuth());
+                    Configuration conf = s3Properties.getHadoopConfiguration();
                     System.setProperty("com.amazonaws.services.s3.enableV4", 
"true");
-                    // the entry value in properties may be null, and
-                    
PropertyConverter.convertToHadoopFSProperties(properties).entrySet().stream()
-                            .filter(entry -> entry.getKey() != null && 
entry.getValue() != null)
-                            .forEach(entry -> conf.set(entry.getKey(), 
entry.getValue()));
-                    // S3 does not support Kerberos authentication,
-                    // so here we create a simple authentication
                     AuthenticationConfig authConfig = 
AuthenticationConfig.getSimpleAuthenticationConfig(conf);
                     HadoopAuthenticator authenticator = 
HadoopAuthenticator.getHadoopAuthenticator(authConfig);
                     try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
index 89f4af2817e..49023f5a989 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
@@ -20,9 +20,9 @@ package org.apache.doris.fs.remote.dfs;
 import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.backup.Status;
 import org.apache.doris.common.UserException;
-import org.apache.doris.common.security.authentication.AuthenticationConfig;
 import org.apache.doris.common.security.authentication.HadoopAuthenticator;
 import org.apache.doris.common.util.URI;
+import org.apache.doris.datasource.property.storage.HDFSProperties;
 import org.apache.doris.fs.operations.HDFSFileOperations;
 import org.apache.doris.fs.operations.HDFSOpParams;
 import org.apache.doris.fs.operations.OpParams;
@@ -55,7 +55,6 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Map;
 
 public class DFSFileSystem extends RemoteFileSystem {
 
@@ -63,14 +62,20 @@ public class DFSFileSystem extends RemoteFileSystem {
     private static final Logger LOG = 
LogManager.getLogger(DFSFileSystem.class);
     private HDFSFileOperations operations = null;
     private HadoopAuthenticator authenticator = null;
+    private HDFSProperties hdfsProperties;
 
-    public DFSFileSystem(Map<String, String> properties) {
-        this(StorageBackend.StorageType.HDFS, properties);
+    public DFSFileSystem(HDFSProperties hdfsProperties) {
+
+
+        super(StorageBackend.StorageType.HDFS.name(), 
StorageBackend.StorageType.HDFS);
+        this.properties.putAll(properties);
+        this.hdfsProperties = hdfsProperties;
     }
 
-    public DFSFileSystem(StorageBackend.StorageType type, Map<String, String> 
properties) {
-        super(type.name(), type);
+    public DFSFileSystem(HDFSProperties hdfsProperties, 
StorageBackend.StorageType storageType) {
+        super(storageType.name(), storageType);
         this.properties.putAll(properties);
+        this.hdfsProperties = hdfsProperties;
     }
 
     @VisibleForTesting
@@ -85,12 +90,8 @@ public class DFSFileSystem extends RemoteFileSystem {
                     throw new UserException("FileSystem is closed.");
                 }
                 if (dfsFileSystem == null) {
-                    Configuration conf = 
getHdfsConf(ifNotSetFallbackToSimpleAuth());
-                    for (Map.Entry<String, String> propEntry : 
properties.entrySet()) {
-                        conf.set(propEntry.getKey(), propEntry.getValue());
-                    }
-                    AuthenticationConfig authConfig = 
AuthenticationConfig.getKerberosConfig(conf);
-                    authenticator = 
HadoopAuthenticator.getHadoopAuthenticator(authConfig);
+                    Configuration conf = 
hdfsProperties.getHadoopConfiguration();
+                    authenticator = 
HadoopAuthenticator.getHadoopAuthenticator(conf);
                     try {
                         dfsFileSystem = authenticator.doAs(() -> {
                             try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/JFSFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/JFSFileSystem.java
index ffabb211d08..165aae1942a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/JFSFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/JFSFileSystem.java
@@ -18,11 +18,10 @@
 package org.apache.doris.fs.remote.dfs;
 
 import org.apache.doris.analysis.StorageBackend;
-
-import java.util.Map;
+import org.apache.doris.datasource.property.storage.HDFSProperties;
 
 public class JFSFileSystem extends DFSFileSystem {
-    public JFSFileSystem(Map<String, String> properties) {
-        super(StorageBackend.StorageType.JFS, properties);
+    public JFSFileSystem(HDFSProperties hdfsProperties) {
+        super(hdfsProperties, StorageBackend.StorageType.JFS);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/OFSFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/OFSFileSystem.java
index dd69a300392..bdc3ea8e771 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/OFSFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/OFSFileSystem.java
@@ -18,11 +18,10 @@
 package org.apache.doris.fs.remote.dfs;
 
 import org.apache.doris.analysis.StorageBackend;
-
-import java.util.Map;
+import org.apache.doris.datasource.property.storage.HDFSProperties;
 
 public class OFSFileSystem extends DFSFileSystem {
-    public OFSFileSystem(Map<String, String> properties) {
-        super(StorageBackend.StorageType.OFS, properties);
+    public OFSFileSystem(HDFSProperties properties) {
+        super(properties, StorageBackend.StorageType.OFS);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index 9031efd0dc2..74883c2f20c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -41,6 +41,7 @@ import org.apache.doris.common.util.FileFormatConstants;
 import org.apache.doris.common.util.FileFormatUtils;
 import org.apache.doris.common.util.NetUtils;
 import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.property.storage.StorageProperties;
 import org.apache.doris.datasource.tvf.source.TVFScanNode;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.planner.PlanNodeId;
@@ -108,6 +109,7 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
 
     protected List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();
     protected Map<String, String> locationProperties = Maps.newHashMap();
+    protected StorageProperties storageProperties;
     protected String filePath;
 
     protected TFileFormatType fileFormatType;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
index 3defb171a9f..f6714995d67 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
@@ -22,12 +22,11 @@ import org.apache.doris.analysis.StorageBackend.StorageType;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.UserException;
-import org.apache.doris.common.credentials.CloudCredentialWithEndpoint;
 import org.apache.doris.common.util.S3URI;
 import org.apache.doris.datasource.property.PropertyConverter;
-import org.apache.doris.datasource.property.S3ClientBEProperties;
 import org.apache.doris.datasource.property.constants.AzureProperties;
 import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
 import org.apache.doris.fs.FileSystemFactory;
 import org.apache.doris.thrift.TFileType;
 
@@ -77,66 +76,26 @@ public class S3TableValuedFunction extends 
ExternalFileTableValuedFunction {
 
         // get endpoint first from properties, if not present, get it from s3 
uri.
         // If endpoint is missing, exception will be thrown.
-        String endpoint = constructEndpoint(otherProps, s3uri);
-        if (!otherProps.containsKey(S3Properties.REGION)) {
-            String region;
-            if (AzureProperties.checkAzureProviderPropertyExist(properties)) {
-                // Azure could run without region
-                region = s3uri.getRegion().orElse("DUMMY-REGION");
-            } else {
-                region = s3uri.getRegion().orElseThrow(() -> new 
AnalysisException(
-                        String.format("Properties '%s' is required.", 
S3Properties.REGION)));
-            }
-            otherProps.put(S3Properties.REGION, region);
-        }
-        checkNecessaryS3Properties(otherProps);
-        CloudCredentialWithEndpoint credential = new 
CloudCredentialWithEndpoint(endpoint,
-                getOrDefaultAndRemove(otherProps, S3Properties.REGION, ""),
-                getOrDefaultAndRemove(otherProps, S3Properties.ACCESS_KEY, ""),
-                getOrDefaultAndRemove(otherProps, S3Properties.SECRET_KEY, 
""));
-        if (otherProps.containsKey(S3Properties.SESSION_TOKEN)) {
-            credential.setSessionToken(getOrDefaultAndRemove(otherProps, 
S3Properties.SESSION_TOKEN, ""));
-        }
-
-        locationProperties = S3Properties.credentialToMap(credential);
         locationProperties.put(PropertyConverter.USE_PATH_STYLE, usePathStyle);
         if (AzureProperties.checkAzureProviderPropertyExist(properties)) {
+            //todo waiting support for Azure
             // For Azure's compatibility, we need bucket to connect to the 
blob storage's container
             locationProperties.put(S3Properties.BUCKET, s3uri.getBucket());
         }
-        
locationProperties.putAll(S3ClientBEProperties.getBeFSProperties(locationProperties));
+        this.storageProperties = 
StorageProperties.createStorageProperties(properties);
+        
locationProperties.putAll(storageProperties.getBackendConfigProperties());
         locationProperties.putAll(otherProps);
 
         filePath = NAME + S3URI.SCHEME_DELIM + s3uri.getBucket() + 
S3URI.PATH_DELIM + s3uri.getKey();
 
         if (FeConstants.runningUnitTest) {
             // Just check
-            FileSystemFactory.getS3FileSystem(locationProperties);
+            FileSystemFactory.get(storageProperties);
         } else {
             parseFile();
         }
     }
 
-    private String constructEndpoint(Map<String, String> properties, S3URI 
s3uri) throws AnalysisException {
-        String endpoint;
-        if (!AzureProperties.checkAzureProviderPropertyExist(properties)) {
-            // get endpoint first from properties, if not present, get it from 
s3 uri.
-            // If endpoint is missing, exception will be thrown.
-            endpoint = getOrDefaultAndRemove(properties, 
S3Properties.ENDPOINT, s3uri.getEndpoint().orElse(""));
-            if (Strings.isNullOrEmpty(endpoint)) {
-                throw new AnalysisException(String.format("Properties '%s' is 
required.", S3Properties.ENDPOINT));
-            }
-        } else {
-            String bucket = s3uri.getBucket();
-            String accountName = 
properties.getOrDefault(S3Properties.ACCESS_KEY, "");
-            if (accountName.isEmpty()) {
-                throw new AnalysisException(String.format("Properties '%s' is 
required.", S3Properties.ACCESS_KEY));
-            }
-            endpoint = String.format(AzureProperties.AZURE_ENDPOINT_TEMPLATE, 
accountName, bucket);
-        }
-        return endpoint;
-    }
-
     private void forwardCompatibleDeprecatedKeys(Map<String, String> props) {
         for (String deprecatedKey : DEPRECATED_KEYS) {
             String value = props.remove(deprecatedKey);
@@ -146,13 +105,6 @@ public class S3TableValuedFunction extends 
ExternalFileTableValuedFunction {
         }
     }
 
-    private void checkNecessaryS3Properties(Map<String, String> props) throws 
AnalysisException {
-        if (Strings.isNullOrEmpty(props.get(S3Properties.REGION))) {
-            throw new AnalysisException(String.format("Properties '%s' is 
required.", S3Properties.REGION));
-        }
-        // do not check ak and sk, because we can read them from system 
environment.
-    }
-
     private S3URI getS3Uri(String uri, boolean isPathStyle, boolean 
forceParsingStandardUri) throws AnalysisException {
         try {
             return S3URI.create(uri, isPathStyle, forceParsingStandardUri);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogAPITest.java 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogAPITest.java
index 19eaa747b00..7fe062c7d03 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogAPITest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogAPITest.java
@@ -20,6 +20,8 @@ package org.apache.doris.datasource;
 import org.apache.doris.backup.Status;
 import org.apache.doris.common.security.authentication.AuthenticationConfig;
 import org.apache.doris.common.security.authentication.HadoopAuthenticator;
+import org.apache.doris.datasource.property.storage.HDFSProperties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
 import org.apache.doris.fs.remote.RemoteFile;
 import org.apache.doris.fs.remote.dfs.DFSFileSystem;
 
@@ -203,7 +205,8 @@ public class CatalogAPITest {
         properties.put("fs.oss.accessKeyId", ak);
         properties.put("fs.oss.accessKeySecret", sk);
         properties.put("fs.oss.endpoint", "cn-beijing.oss-dls.aliyuncs.com");
-        DFSFileSystem fs = new DFSFileSystem(properties);
+        HDFSProperties hdfsProperties = (HDFSProperties) 
StorageProperties.createStorageProperties(properties);
+        DFSFileSystem fs = new DFSFileSystem(hdfsProperties);
         List<RemoteFile> results = new ArrayList<>();
 
         Status st = fs.listFiles(remotePath, false, results);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/HMSPropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/HMSPropertiesTest.java
index a51e3e98b33..9b44f80aa59 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/HMSPropertiesTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/HMSPropertiesTest.java
@@ -86,7 +86,7 @@ public class HMSPropertiesTest {
 
     private void testHmsToPaimonOptions(HMSProperties hmsProperties) {
         Options paimonOptions = new Options();
-        hmsProperties.toPaimonOptionsAndConf(paimonOptions);
+        //hmsProperties.toPaimonOptionsAndConf(paimonOptions);
         Assertions.assertEquals("thrift://127.0.0.1:9083", 
paimonOptions.get("uri"));
     }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/COSPropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/COSPropertiesTest.java
index 46d63b33209..18a0f710a86 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/COSPropertiesTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/COSPropertiesTest.java
@@ -56,8 +56,7 @@ public class COSPropertiesTest {
         origProps.put("cos.use_path_style", "true");
         origProps.put(StorageProperties.FS_COS_SUPPORT, "true");
         COSProperties cosProperties = (COSProperties) 
StorageProperties.create(origProps).get(1);
-        Map<String, String> config = new HashMap<>();
-        cosProperties.toHadoopConfiguration(config);
+        Configuration config = cosProperties.getHadoopConfiguration();
 
         // Validate the configuration
         Assertions.assertEquals("https://cos.example.com";, 
config.get("fs.cos.endpoint"));
@@ -123,12 +122,7 @@ public class COSPropertiesTest {
         origProps.put(StorageProperties.FS_COS_SUPPORT, "true");
         COSProperties cosProperties = (COSProperties) 
StorageProperties.create(origProps).get(1);
 
-        Map<String, String> hdfsParams = new HashMap<>();
-        cosProperties.toHadoopConfiguration(hdfsParams);
-        Configuration configuration = new Configuration(false);
-        for (Map.Entry<String, String> entry : hdfsParams.entrySet()) {
-            configuration.set(entry.getKey(), entry.getValue());
-        }
+        Configuration configuration = cosProperties.getHadoopConfiguration();
         FileSystem fs = FileSystem.get(new URI(hdfsPath), configuration);
         FileStatus[] fileStatuses = fs.listStatus(new Path(hdfsPath));
         for (FileStatus status : fileStatuses) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OBSPropertyTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OBSPropertyTest.java
index 90742e61783..d4191d4be26 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OBSPropertyTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OBSPropertyTest.java
@@ -43,12 +43,12 @@ public class OBSPropertyTest {
         origProps.put(StorageProperties.FS_OBS_SUPPORT, "true");
 
         ObjectStorageProperties properties = (ObjectStorageProperties) 
StorageProperties.create(origProps).get(1);
-        properties.toHadoopConfiguration(origProps);
+        Configuration conf = properties.getHadoopConfiguration();
 
-        Assertions.assertEquals("https://obs.example.com";, 
origProps.get("fs.obs.endpoint"));
-        Assertions.assertEquals("myOBSAccessKey", 
origProps.get("fs.obs.access.key"));
-        Assertions.assertEquals("myOBSSecretKey", 
origProps.get("fs.obs.secret.key"));
-        Assertions.assertEquals("org.apache.hadoop.fs.obs.OBSFileSystem", 
origProps.get("fs.obs.impl"));
+        Assertions.assertEquals("https://obs.example.com";, 
conf.get("fs.obs.endpoint"));
+        Assertions.assertEquals("myOBSAccessKey", 
conf.get("fs.obs.access.key"));
+        Assertions.assertEquals("myOBSSecretKey", 
conf.get("fs.obs.secret.key"));
+        Assertions.assertEquals("org.apache.hadoop.fs.obs.OBSFileSystem", 
conf.get("fs.obs.impl"));
 
         // Test creation without additional properties
         origProps = new HashMap<>();
@@ -111,12 +111,7 @@ public class OBSPropertyTest {
         origProps.put("obs.endpoint", "obs.cn-north-4.myhuaweicloud.com");
         origProps.put(StorageProperties.FS_OBS_SUPPORT, "true");
         OBSProperties obsProperties = (OBSProperties) 
StorageProperties.create(origProps).get(1);
-        Map<String, String> hdfsParams = new HashMap<>();
-        obsProperties.toHadoopConfiguration(hdfsParams);
-        Configuration configuration = new Configuration(false);
-        for (Map.Entry<String, String> entry : hdfsParams.entrySet()) {
-            configuration.set(entry.getKey(), entry.getValue());
-        }
+        Configuration configuration = obsProperties.getHadoopConfiguration();
         FileSystem fs = FileSystem.get(new URI(hdfsPath), configuration);
         FileStatus[] fileStatuses = fs.listStatus(new Path(hdfsPath));
         for (FileStatus status : fileStatuses) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OSSPropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OSSPropertiesTest.java
index 097b002f5c8..8002f8781fa 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OSSPropertiesTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OSSPropertiesTest.java
@@ -45,11 +45,11 @@ public class OSSPropertiesTest {
         origProps.put("oss.secret_key", "myOSSSecretKey");
         origProps.put(StorageProperties.FS_OSS_SUPPORT, "true");
         ObjectStorageProperties properties = (ObjectStorageProperties) 
StorageProperties.create(origProps).get(1);
-        properties.toHadoopConfiguration(origProps);
-        Assertions.assertEquals("https://oss.aliyuncs.com";, 
origProps.get("fs.oss.endpoint"));
-        Assertions.assertEquals("myOSSAccessKey", 
origProps.get("fs.oss.accessKeyId"));
-        Assertions.assertEquals("myOSSSecretKey", 
origProps.get("fs.oss.accessKeySecret"));
-        
Assertions.assertEquals("org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem", 
origProps.get("fs.oss.impl"));
+        Configuration conf = properties.getHadoopConfiguration();
+        Assertions.assertEquals("https://oss.aliyuncs.com";, 
conf.get("fs.oss.endpoint"));
+        Assertions.assertEquals("myOSSAccessKey", 
conf.get("fs.oss.accessKeyId"));
+        Assertions.assertEquals("myOSSSecretKey", 
conf.get("fs.oss.accessKeySecret"));
+        
Assertions.assertEquals("org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem", 
conf.get("fs.oss.impl"));
         origProps = new HashMap<>();
         origProps.put("oss.endpoint", "https://oss.aliyuncs.com";);
         StorageProperties.create(origProps);
@@ -105,12 +105,7 @@ public class OSSPropertiesTest {
         origProps.put(StorageProperties.FS_OSS_SUPPORT, "true");
         OSSProperties ossProperties = (OSSProperties) 
StorageProperties.create(origProps).get(1);
         // ossParams.put("fs.AbstractFileSystem.oss.impl", 
"com.aliyun.jindodata.oss.JindoOSS");
-        Map<String, String> hadoopParams = new HashMap<>();
-        ossProperties.toHadoopConfiguration(hadoopParams);
-        Configuration configuration = new Configuration(false);
-        for (Map.Entry<String, String> entry : hadoopParams.entrySet()) {
-            configuration.set(entry.getKey(), entry.getValue());
-        }
+        Configuration configuration = ossProperties.getHadoopConfiguration();
         FileSystem fs = FileSystem.get(new URI(hdfsPath), configuration);
         FileStatus[] fileStatuses = fs.listStatus(new Path(hdfsPath));
         for (FileStatus status : fileStatuses) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java
index 83eb06afc8b..0e2085b90cc 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java
@@ -52,9 +52,7 @@ public class S3PropertiesTest {
         origProps.put("s3.region", "us-west-1");
         origProps.put(StorageProperties.FS_S3_SUPPORT, "true");
         S3Properties s3Properties = (S3Properties) 
StorageProperties.create(origProps).get(1);
-        Map<String, String> config = new HashMap<>();
-        s3Properties.toHadoopConfiguration(config);
-
+        Configuration config = s3Properties.getHadoopConfiguration();
         // Validate the configuration
         Assertions.assertEquals("myS3AccessKey", 
config.get("fs.s3a.access.key"));
         Assertions.assertEquals("myS3SecretKey", 
config.get("fs.s3a.secret.key"));
@@ -118,13 +116,7 @@ public class S3PropertiesTest {
         origProps.put("s3.region", "ap-northeast-1");
         origProps.put(StorageProperties.FS_S3_SUPPORT, "true");
         S3Properties s3Properties = (S3Properties) 
StorageProperties.create(origProps).get(1);
-
-        Map<String, String> hdfsParams = new HashMap<>();
-        s3Properties.toHadoopConfiguration(hdfsParams);
-        Configuration configuration = new Configuration(false);
-        for (Map.Entry<String, String> entry : hdfsParams.entrySet()) {
-            configuration.set(entry.getKey(), entry.getValue());
-        }
+        Configuration configuration = s3Properties.getHadoopConfiguration();
         FileSystem fs = FileSystem.get(new URI(hdfsPath), configuration);
         FileStatus[] fileStatuses = fs.listStatus(new Path(hdfsPath));
         for (FileStatus status : fileStatuses) {
diff --git a/fe/pom.xml b/fe/pom.xml
index f0b1c03ffe7..cad105cd4c6 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -262,6 +262,7 @@ under the License.
         <javax.servlet-api.version>3.1.0</javax.servlet-api.version>
         <je.version>18.3.14-doris-SNAPSHOT</je.version>
         <jflex.version>1.4.3</jflex.version>
+        <re2j.version>1.8</re2j.version>
         <jmockit.version>1.49</jmockit.version>
         <commons-io.version>2.7</commons-io.version>
         <json-simple.version>1.1.1</json-simple.version>
@@ -602,6 +603,11 @@ under the License.
                     </exclusion>
                 </exclusions>
             </dependency>
+            <dependency>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-client-api</artifactId>
+                <version>${hadoop.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.apache.hadoop</groupId>
                 <artifactId>hadoop-client</artifactId>
@@ -693,6 +699,11 @@ under the License.
                     </exclusion>
                 </exclusions>
             </dependency>
+            <dependency>
+                <groupId>com.google.re2j</groupId>
+                <artifactId>re2j</artifactId>
+                <version>${re2j.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.apache.hbase.thirdparty</groupId>
                 <artifactId>hbase-shaded-gson</artifactId>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to