This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-refactor_property
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-refactor_property by
this push:
new faeb0987097 [Feat](Storage)Refactor Storage Integration: Unified
Storage Parameter Handling for Export, TVF, and Backup (#49163)
faeb0987097 is described below
commit faeb09870975d1dba5583da36c8627fc267adcba
Author: Calvin Kirs <[email protected]>
AuthorDate: Tue Mar 18 22:23:59 2025 +0800
[Feat](Storage)Refactor Storage Integration: Unified Storage Parameter
Handling for Export, TVF, and Backup (#49163)
## PR Description
This PR introduces a unified storage parameter handling mechanism for
storage-related functionalities, including Export, Table-Valued
Functions (TVF), and Backup. The key objective is to decouple business
logic from the underlying storage implementation, ensuring that the
business layer only focuses on its domain logic while storage parameters
manage the interaction with the storage system.
## Key Changes
### Centralized Storage Parameter Management:
Instead of handling storage configurations in multiple places, storage
parameters now serve as the sole interface between the business logic
and the storage layer. This improves maintainability, avoids scattered
storage-related logic, and simplifies future storage extensions. ###
Separation of Concerns:
The business layer is storage-agnostic, meaning it does not need to
handle storage-specific details. Storage parameters are responsible for
translating high-level storage configurations into system-specific
settings.
### Phase 1 Implementation:
This PR focuses on integrating Export, TVF, and Backup with the new
storage parameter mechanism. Subsequent PRs will address Broker and
additional components, ensuring a smooth and incremental migration.
## Next Steps
Gradually extend this unified approach to Broker and other
storage-dependent functionalities. Continue refining the abstraction to
support future storage systems with minimal modifications.
## Why This Change?
Enhances maintainability by consolidating storage-related logic.
Improves extensibility for future storage backends. Simplifies business
logic, ensuring a cleaner separation of concerns.
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
.../authentication/HadoopAuthenticator.java | 6 ++
fe/fe-core/pom.xml | 5 ++
.../java/org/apache/doris/analysis/BrokerDesc.java | 16 +++--
.../org/apache/doris/analysis/OutFileClause.java | 45 +-----------
.../org/apache/doris/analysis/StorageDesc.java | 7 ++
.../org/apache/doris/backup/BackupHandler.java | 63 +++++++---------
.../java/org/apache/doris/backup/BackupJob.java | 5 +-
.../java/org/apache/doris/backup/Repository.java | 12 +---
.../java/org/apache/doris/backup/RestoreJob.java | 84 +++++++++++-----------
.../java/org/apache/doris/catalog/S3Resource.java | 8 ++-
.../org/apache/doris/common/util/BrokerUtil.java | 7 +-
.../datasource/property/ConnectionProperties.java | 10 ++-
.../connection/PaimonConnectionProperties.java | 5 +-
.../storage/AbstractObjectStorageProperties.java | 9 ++-
.../datasource/property/storage/COSProperties.java | 19 +++--
.../property/storage/HDFSProperties.java | 68 +++++++++++++++++-
.../datasource/property/storage/OBSProperties.java | 19 +++--
.../datasource/property/storage/OSSProperties.java | 17 +++--
.../property/storage/ObjectStorageProperties.java | 4 +-
.../datasource/property/storage/S3Properties.java | 38 +++++-----
.../property/storage/StorageProperties.java | 34 ++++++++-
.../property/storage/StorageTypeMapper.java | 50 +++++++++++++
.../org/apache/doris/fs/FileSystemFactory.java | 24 ++++---
.../org/apache/doris/fs/PersistentFileSystem.java | 5 ++
.../org/apache/doris/fs/remote/S3FileSystem.java | 22 +++---
.../apache/doris/fs/remote/dfs/DFSFileSystem.java | 25 +++----
.../apache/doris/fs/remote/dfs/JFSFileSystem.java | 7 +-
.../apache/doris/fs/remote/dfs/OFSFileSystem.java | 7 +-
.../ExternalFileTableValuedFunction.java | 2 +
.../doris/tablefunction/S3TableValuedFunction.java | 58 ++-------------
.../apache/doris/datasource/CatalogAPITest.java | 5 +-
.../property/metastore/HMSPropertiesTest.java | 2 +-
.../property/storage/COSPropertiesTest.java | 10 +--
.../property/storage/OBSPropertyTest.java | 17 ++---
.../property/storage/OSSPropertiesTest.java | 17 ++---
.../property/storage/S3PropertiesTest.java | 12 +---
fe/pom.xml | 11 +++
37 files changed, 430 insertions(+), 325 deletions(-)
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java
index c3cab5f410b..eeba82e280e 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java
@@ -17,6 +17,7 @@
package org.apache.doris.common.security.authentication;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
@@ -41,4 +42,9 @@ public interface HadoopAuthenticator {
return new HadoopSimpleAuthenticator((SimpleAuthenticationConfig)
config);
}
}
+
+ static HadoopAuthenticator getHadoopAuthenticator(Configuration
configuration) {
+ AuthenticationConfig authConfig =
AuthenticationConfig.getKerberosConfig(configuration);
+ return getHadoopAuthenticator(authConfig);
+ }
}
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index e1ae5dc60db..371481a2e91 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -141,6 +141,11 @@ under the License.
<groupId>commons-validator</groupId>
<artifactId>commons-validator</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.google.re2j</groupId>
+ <artifactId>re2j</artifactId>
+ </dependency>
+
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java
index 4755159379f..a68d9099558 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java
@@ -23,8 +23,9 @@ import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.PrintableMap;
-import org.apache.doris.datasource.property.S3ClientBEProperties;
import org.apache.doris.datasource.property.constants.BosProperties;
+import org.apache.doris.datasource.property.storage.ObjectStorageProperties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.fs.PersistentFileSystem;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TFileType;
@@ -80,8 +81,10 @@ public class BrokerDesc extends StorageDesc implements
Writable {
} else {
this.storageType = StorageBackend.StorageType.BROKER;
}
-
this.properties.putAll(S3ClientBEProperties.getBeFSProperties(this.properties));
- this.convertedToS3 = BosProperties.tryConvertBosToS3(this.properties,
this.storageType);
+ this.storageProperties =
StorageProperties.createStorageProperties(properties);
+ // we should use storage properties?
+ this.properties.putAll(storageProperties.getBackendConfigProperties());
+ this.convertedToS3 =
ObjectStorageProperties.class.isInstance(this.storageProperties);
if (this.convertedToS3) {
this.storageType = StorageBackend.StorageType.S3;
}
@@ -93,9 +96,11 @@ public class BrokerDesc extends StorageDesc implements
Writable {
if (properties != null) {
this.properties.putAll(properties);
}
+ this.storageProperties =
StorageProperties.createStorageProperties(properties);
+ // we should use storage properties?
+ this.properties.putAll(storageProperties.getBackendConfigProperties());
this.storageType = storageType;
-
this.properties.putAll(S3ClientBEProperties.getBeFSProperties(this.properties));
- this.convertedToS3 = BosProperties.tryConvertBosToS3(this.properties,
this.storageType);
+ this.convertedToS3 =
ObjectStorageProperties.class.isInstance(this.storageProperties);
if (this.convertedToS3) {
this.storageType = StorageBackend.StorageType.S3;
}
@@ -159,6 +164,7 @@ public class BrokerDesc extends StorageDesc implements
Writable {
LOG.warn("set to BROKER, because of exception", e);
}
}
+ this.storageProperties =
StorageProperties.createStorageProperties(properties);
storageType = st;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index d4a7b25ed5a..ff49c8a80cd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -34,7 +34,6 @@ import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.common.util.ParseUtil;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.Util;
-import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
@@ -61,7 +60,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
// For syntax select * from tbl INTO OUTFILE xxxx
public class OutFileClause {
@@ -616,14 +614,6 @@ public class OutFileClause {
if (this.fileFormatType == TFileFormatType.FORMAT_ORC) {
getOrcProperties(processedPropKeys);
}
-
- if (processedPropKeys.size() != properties.size()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} vs {}", processedPropKeys, properties);
- }
- throw new AnalysisException("Unknown properties: " +
properties.keySet().stream()
- .filter(k ->
!processedPropKeys.contains(k)).collect(Collectors.toList()));
- }
}
/**
@@ -647,38 +637,7 @@ public class OutFileClause {
return;
}
- Map<String, String> brokerProps = Maps.newHashMap();
- for (Map.Entry<String, String> entry : properties.entrySet()) {
- if (entry.getKey().startsWith(BROKER_PROP_PREFIX) &&
!entry.getKey().equals(PROP_BROKER_NAME)) {
-
brokerProps.put(entry.getKey().substring(BROKER_PROP_PREFIX.length()),
entry.getValue());
- processedPropKeys.add(entry.getKey());
- } else if
(entry.getKey().toLowerCase().startsWith(S3Properties.S3_PREFIX)
- ||
entry.getKey().toUpperCase().startsWith(S3Properties.Env.PROPERTIES_PREFIX)) {
- brokerProps.put(entry.getKey(), entry.getValue());
- processedPropKeys.add(entry.getKey());
- } else if (entry.getKey().contains(HdfsResource.HADOOP_FS_NAME)
- && storageType == StorageBackend.StorageType.HDFS) {
- brokerProps.put(entry.getKey(), entry.getValue());
- processedPropKeys.add(entry.getKey());
- } else if ((entry.getKey().startsWith(HADOOP_FS_PROP_PREFIX)
- || entry.getKey().startsWith(HADOOP_PROP_PREFIX))
- && storageType == StorageBackend.StorageType.HDFS) {
- brokerProps.put(entry.getKey(), entry.getValue());
- processedPropKeys.add(entry.getKey());
- }
- }
- if (storageType == StorageBackend.StorageType.S3) {
- if (properties.containsKey(PropertyConverter.USE_PATH_STYLE)) {
- brokerProps.put(PropertyConverter.USE_PATH_STYLE,
properties.get(PropertyConverter.USE_PATH_STYLE));
- processedPropKeys.add(PropertyConverter.USE_PATH_STYLE);
- }
- S3Properties.requiredS3Properties(brokerProps);
- } else if (storageType == StorageBackend.StorageType.HDFS) {
- if (!brokerProps.containsKey(HdfsResource.HADOOP_FS_NAME)) {
- brokerProps.put(HdfsResource.HADOOP_FS_NAME,
getFsName(filePath));
- }
- }
- brokerDesc = new BrokerDesc(brokerName, storageType, brokerProps);
+ brokerDesc = new BrokerDesc(brokerName, storageType, properties);
}
public static String getFsName(String path) {
@@ -869,7 +828,7 @@ public class OutFileClause {
sinkOptions.setWithBom(withBom);
if (brokerDesc != null) {
- sinkOptions.setBrokerProperties(brokerDesc.getProperties());
+
sinkOptions.setBrokerProperties(brokerDesc.getStorageProperties().getBackendConfigProperties());
// broker_addresses of sinkOptions will be set in Coordinator.
// Because we need to choose the nearest broker with the result
sink node.
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageDesc.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageDesc.java
index 902f5d24b91..28ebdff71b7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageDesc.java
@@ -17,7 +17,10 @@
package org.apache.doris.analysis;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+
import com.google.gson.annotations.SerializedName;
+import lombok.Getter;
import java.util.Map;
@@ -35,6 +38,9 @@ public class StorageDesc extends ResourceDesc {
@SerializedName("st")
protected StorageBackend.StorageType storageType;
+ @Getter
+ protected StorageProperties storageProperties;
+
public StorageDesc() {
}
@@ -42,6 +48,7 @@ public class StorageDesc extends ResourceDesc {
this.name = name;
this.storageType = storageType;
this.properties = properties;
+ this.storageProperties =
StorageProperties.createStorageProperties(properties);
}
public void setName(String name) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
index 040ab729a5f..d76147fca4c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
@@ -48,10 +48,9 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.fs.FileSystemFactory;
-import org.apache.doris.fs.remote.AzureFileSystem;
import org.apache.doris.fs.remote.RemoteFileSystem;
-import org.apache.doris.fs.remote.S3FileSystem;
import org.apache.doris.persist.BarrierLog;
import org.apache.doris.task.DirMoveTask;
import org.apache.doris.task.DownloadTask;
@@ -212,8 +211,9 @@ public class BackupHandler extends MasterDaemon implements
Writable {
"broker does not exist: " + stmt.getBrokerName());
}
- RemoteFileSystem fileSystem =
FileSystemFactory.get(stmt.getBrokerName(), stmt.getStorageType(),
- stmt.getProperties());
+ StorageProperties storageProperties =
StorageProperties.createStorageProperties(stmt.getProperties());
+
+ RemoteFileSystem fileSystem = FileSystemFactory.get(storageProperties);
long repoId = env.getNextId();
Repository repo = new Repository(repoId, stmt.getName(),
stmt.isReadOnly(), stmt.getLocation(), fileSystem);
@@ -235,44 +235,28 @@ public class BackupHandler extends MasterDaemon
implements Writable {
if (repo == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Repository does not exist");
}
+ Map<String, String> allProperties = new
HashMap<>(repo.getRemoteFileSystem().getProperties());
+ allProperties.putAll(stmt.getProperties());
+ StorageProperties newStorageProperties =
StorageProperties.createStorageProperties(allProperties);
+ RemoteFileSystem fileSystem =
FileSystemFactory.get(newStorageProperties);
- if (repo.getRemoteFileSystem() instanceof S3FileSystem
- || repo.getRemoteFileSystem() instanceof AzureFileSystem) {
- Map<String, String> oldProperties = new
HashMap<>(stmt.getProperties());
- Status status =
repo.alterRepositoryS3Properties(oldProperties);
- if (!status.ok()) {
- ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
status.getErrMsg());
- }
- RemoteFileSystem fileSystem = null;
- if (repo.getRemoteFileSystem() instanceof S3FileSystem) {
- fileSystem =
FileSystemFactory.get(repo.getRemoteFileSystem().getName(),
- StorageBackend.StorageType.S3, oldProperties);
- } else if (repo.getRemoteFileSystem() instanceof
AzureFileSystem) {
- fileSystem =
FileSystemFactory.get(repo.getRemoteFileSystem().getName(),
- StorageBackend.StorageType.AZURE, oldProperties);
- }
-
- Repository newRepo = new Repository(repo.getId(),
repo.getName(), repo.isReadOnly(),
- repo.getLocation(), fileSystem);
- if (!newRepo.ping()) {
- LOG.warn("Failed to connect repository {}. msg: {}",
repo.getName(), repo.getErrorMsg());
- ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
- "Repo can not ping with new s3 properties");
- }
+ Repository newRepo = new Repository(repo.getId(), repo.getName(),
repo.isReadOnly(),
+ repo.getLocation(), fileSystem);
+ if (!newRepo.ping()) {
+ LOG.warn("Failed to connect repository {}. msg: {}",
repo.getName(), repo.getErrorMsg());
+ ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
+ "Repo can not ping with new storage properties");
+ }
- Status st = repoMgr.alterRepo(newRepo, false /* not replay */);
- if (!st.ok()) {
- ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
- "Failed to alter repository: " + st.getErrMsg());
- }
- for (AbstractJob job : getAllCurrentJobs()) {
- if (!job.isDone() && job.getRepoId() == repo.getId()) {
- job.updateRepo(newRepo);
- }
- }
- } else {
+ Status st = repoMgr.alterRepo(newRepo, false /* not replay */);
+ if (!st.ok()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
- "Only support alter s3 or azure repository");
+ "Failed to alter repository: " + st.getErrMsg());
+ }
+ for (AbstractJob job : getAllCurrentJobs()) {
+ if (!job.isDone() && job.getRepoId() == repo.getId()) {
+ job.updateRepo(newRepo);
+ }
}
} finally {
seqlock.unlock();
@@ -285,6 +269,7 @@ public class BackupHandler extends MasterDaemon implements
Writable {
}
// handle drop repository stmt
+ // todo when drop repository, we should check if there is any job running
on it and should close fs
public void dropRepository(String repoName) throws DdlException {
tryLock();
try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index b8c73584229..760103e846e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -39,7 +39,6 @@ import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.datasource.property.S3ClientBEProperties;
import org.apache.doris.persist.BarrierLog;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
@@ -384,7 +383,7 @@ public class BackupJob extends AbstractJob implements
GsonPostProcessable {
continue;
}
((UploadTask) task).updateBrokerProperties(
-
S3ClientBEProperties.getBeFSProperties(repo.getRemoteFileSystem().getProperties()));
+
repo.getRemoteFileSystem().getStorageProperties().getBackendConfigProperties());
AgentTaskQueue.updateTask(beId, TTaskType.UPLOAD, signature,
task);
}
LOG.info("finished to update upload job properties. {}", this);
@@ -777,7 +776,7 @@ public class BackupJob extends AbstractJob implements
GsonPostProcessable {
long signature = env.getNextId();
UploadTask task = new UploadTask(null, beId, signature, jobId,
dbId, srcToDest,
brokers.get(0),
-
S3ClientBEProperties.getBeFSProperties(repo.getRemoteFileSystem().getProperties()),
+
repo.getRemoteFileSystem().getStorageProperties().getBackendConfigProperties(),
repo.getRemoteFileSystem().getStorageType(),
repo.getLocation());
batchTask.addTask(task);
unfinishedTaskIds.put(signature, beId);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
index 1450ef9a034..ff3f011ffed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
@@ -30,6 +30,7 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.PersistentFileSystem;
import org.apache.doris.fs.remote.AzureFileSystem;
@@ -205,15 +206,8 @@ public class Repository implements Writable,
GsonPostProcessable {
@Override
public void gsonPostProcess() {
- StorageBackend.StorageType type = StorageBackend.StorageType.BROKER;
- if
(this.fileSystem.properties.containsKey(PersistentFileSystem.STORAGE_TYPE)) {
- type = StorageBackend.StorageType.valueOf(
-
this.fileSystem.properties.get(PersistentFileSystem.STORAGE_TYPE));
-
this.fileSystem.properties.remove(PersistentFileSystem.STORAGE_TYPE);
- }
- this.fileSystem = FileSystemFactory.get(this.fileSystem.getName(),
- type,
- this.fileSystem.getProperties());
+ StorageProperties storageProperties =
StorageProperties.createStorageProperties(this.fileSystem.properties);
+ this.fileSystem = FileSystemFactory.get(storageProperties);
}
public long getId() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index e563192c584..773e13d1143 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -67,7 +67,6 @@ import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.InternalCatalog;
-import org.apache.doris.datasource.property.S3ClientBEProperties;
import org.apache.doris.persist.ColocatePersistInfo;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
@@ -132,14 +131,14 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
// CHECKSTYLE OFF
public enum RestoreJobState {
PENDING, // Job is newly created. Check and prepare meta in catalog.
Create replica if necessary.
- // Waiting for replica creation finished synchronously, then
sending snapshot tasks.
- // then transfer to CREATING.
+ // Waiting for replica creation finished synchronously, then sending
snapshot tasks.
+ // then transfer to CREATING.
CREATING, // Creating replica on BE. Transfer to SNAPSHOTING after all
replicas created.
SNAPSHOTING, // Waiting for snapshot finished. Than transfer to
DOWNLOAD.
DOWNLOAD, // Send download tasks.
DOWNLOADING, // Waiting for download finished.
COMMIT, // After download finished, all data is ready for taking
effect.
- // Send movement tasks to BE, than transfer to COMMITTING
+ // Send movement tasks to BE, than transfer to COMMITTING
COMMITTING, // wait all tasks finished. Transfer to FINISHED
FINISHED,
CANCELLED
@@ -231,9 +230,9 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
}
public RestoreJob(String label, String backupTs, long dbId, String dbName,
BackupJobInfo jobInfo, boolean allowLoad,
- ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion,
boolean reserveReplica,
- boolean reserveColocate, boolean reserveDynamicPartitionEnable,
boolean isBeingSynced,
- boolean isCleanTables, boolean isCleanPartitions, boolean
isAtomicRestore, Env env, long repoId) {
+ ReplicaAllocation replicaAlloc, long timeoutMs, int
metaVersion, boolean reserveReplica,
+ boolean reserveColocate, boolean
reserveDynamicPartitionEnable, boolean isBeingSynced,
+ boolean isCleanTables, boolean isCleanPartitions,
boolean isAtomicRestore, Env env, long repoId) {
super(JobType.RESTORE, label, dbId, dbName, timeoutMs, env, repoId);
this.backupTimestamp = backupTs;
this.jobInfo = jobInfo;
@@ -262,10 +261,10 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
}
public RestoreJob(String label, String backupTs, long dbId, String dbName,
BackupJobInfo jobInfo, boolean allowLoad,
- ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion,
boolean reserveReplica,
- boolean reserveColocate, boolean reserveDynamicPartitionEnable,
boolean isBeingSynced,
- boolean isCleanTables, boolean isCleanPartitions, boolean
isAtomicRestore, Env env, long repoId,
- BackupMeta backupMeta) {
+ ReplicaAllocation replicaAlloc, long timeoutMs, int
metaVersion, boolean reserveReplica,
+ boolean reserveColocate, boolean
reserveDynamicPartitionEnable, boolean isBeingSynced,
+ boolean isCleanTables, boolean isCleanPartitions,
boolean isAtomicRestore, Env env, long repoId,
+ BackupMeta backupMeta) {
this(label, backupTs, dbId, dbName, jobInfo, allowLoad, replicaAlloc,
timeoutMs, metaVersion, reserveReplica,
reserveColocate, reserveDynamicPartitionEnable, isBeingSynced,
isCleanTables, isCleanPartitions,
isAtomicRestore, env, repoId);
@@ -317,7 +316,7 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
Preconditions.checkState(task.getTabletId() == removedTabletId,
removedTabletId);
if (LOG.isDebugEnabled()) {
LOG.debug("get finished snapshot info: {}, unfinished tasks
num: {}, remove result: {}. {}",
- info, unfinishedSignatureToId.size(), this,
removedTabletId);
+ info, unfinishedSignatureToId.size(), this,
removedTabletId);
}
return true;
}
@@ -336,7 +335,7 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
SnapshotInfo info = snapshotInfos.get(tabletId,
task.getBackendId());
if (info == null) {
LOG.warn("failed to find snapshot infos of tablet {} in be {},
{}",
- tabletId, task.getBackendId(), this);
+ tabletId, task.getBackendId(), this);
return false;
}
}
@@ -427,7 +426,7 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
continue;
}
((DownloadTask) task).updateBrokerProperties(
-
S3ClientBEProperties.getBeFSProperties(repo.getRemoteFileSystem().getProperties()));
+
repo.getRemoteFileSystem().getStorageProperties().getBackendConfigProperties());
AgentTaskQueue.updateTask(beId, TTaskType.DOWNLOAD, signature,
task);
}
LOG.info("finished to update download job properties. {}", this);
@@ -544,17 +543,17 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
* Restore rules as follow:
* OlapTable
* A. Table already exist
- * A1. Partition already exist, generate file mapping
- * A2. Partition does not exist, add restored partition to the table.
- * Reset all index/tablet/replica id, and create replica on BE
outside the table lock.
+ * A1. Partition already exist, generate file mapping
+ * A2. Partition does not exist, add restored partition to the table.
+ * Reset all index/tablet/replica id, and create replica on BE outside the
table lock.
* B. Table does not exist
- * B1. Add table to the db, reset all table/index/tablet/replica id,
- * and create replica on BE outside the db lock.
+ * B1. Add table to the db, reset all table/index/tablet/replica id,
+ * and create replica on BE outside the db lock.
* View
- * * A. View already exist. The same signature is allowed.
- * * B. View does not exist.
+ * * A. View already exist. The same signature is allowed.
+ * * B. View does not exist.
* All newly created table/partition/index/tablet/replica should be saved
for rolling back.
- *
+ * <p>
* Step:
* 1. download and deserialize backup meta from repository.
* 2. set all existing restored table's state to RESTORE.
@@ -727,7 +726,7 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
if (!localTblSignature.equals(remoteTblSignature)) {
String alias =
jobInfo.getAliasByOriginNameIfSet(tableName);
LOG.warn("Table {} already exists but with
different schema, "
- + "local table: {}, remote table: {}",
+ + "local table: {}, remote table:
{}",
alias, localTblSignature,
remoteTblSignature);
status = new Status(ErrCode.COMMON_ERROR, "Table "
+ alias + " already exist but with
different schema");
@@ -788,7 +787,7 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
if (reserveReplica) {
PartitionInfo remotePartInfo =
remoteOlapTbl.getPartitionInfo();
restoreReplicaAlloc =
remotePartInfo.getReplicaAllocation(
- remotePartition.getId());
+ remotePartition.getId());
}
Partition restorePart =
resetPartitionForRestore(localOlapTbl, remoteOlapTbl,
partitionName,
@@ -837,7 +836,7 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
// Reset properties to correct values.
remoteOlapTbl.resetPropertiesForRestore(reserveDynamicPartitionEnable,
reserveReplica,
- replicaAlloc,
isBeingSynced);
+ replicaAlloc, isBeingSynced);
// DO NOT set remote table's new name here, cause we will
still need the origin name later
//
remoteOlapTbl.setName(jobInfo.getAliasByOriginNameIfSet(tblInfo.name));
@@ -1019,7 +1018,7 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
if (!(ok && createReplicaTasksLatch.getStatus().ok())) {
// only show at most 10 results
List<String> subList =
createReplicaTasksLatch.getLeftMarks().stream().limit(10)
- .map(item -> "(backendId = " + item.getKey() + ", tabletId
= " + item.getValue() + ")")
+ .map(item -> "(backendId = " + item.getKey() + ", tabletId
= " + item.getValue() + ")")
.collect(Collectors.toList());
String idStr = Joiner.on(", ").join(subList);
String reason = "TIMEDOUT";
@@ -1146,7 +1145,8 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
if (schemaHash == -1) {
return new Status(ErrCode.COMMON_ERROR, String.format(
"schema hash of local index %d is not found,
remote table=%d, remote index=%d, "
- + "local table=%d, local index=%d",
localIndexId, remoteOlapTbl.getId(), index.getId(),
+ + "local table=%d, local index=%d",
+ localIndexId, remoteOlapTbl.getId(),
index.getId(),
localOlapTbl.getId(), localIndexId));
}
@@ -1154,8 +1154,8 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
List<Tablet> remoteTablets = index.getTablets();
if (localTablets.size() != remoteTablets.size()) {
LOG.warn("skip bind replicas because the size of local
tablet {} is not equals to "
- + "the remote {}, is_atomic_restore=true,
remote table={}, remote index={}, "
- + "local table={}, local index={}",
localTablets.size(), remoteTablets.size(),
+ + "the remote {},
is_atomic_restore=true, remote table={}, remote index={}, "
+ + "local table={}, local index={}",
localTablets.size(), remoteTablets.size(),
remoteOlapTbl.getId(), index.getId(),
localOlapTbl.getId(), localIndexId);
continue;
}
@@ -1165,9 +1165,11 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
List<Replica> localReplicas =
localTablet.getReplicas();
List<Replica> remoteReplicas =
remoteTablet.getReplicas();
if (localReplicas.size() != remoteReplicas.size()) {
- LOG.warn("skip bind replicas because the size of
local replicas {} is not equals to "
- + "the remote {}, is_atomic_restore=true,
remote table={}, remote index={}, "
- + "local table={}, local index={}, local
tablet={}, remote tablet={}",
+ LOG.warn("skip bind replicas because the size of
local replicas {} is not"
+ + " equals to "
+ + "the remote {},
is_atomic_restore=true, remote table={}, remote index"
+ + "={}, "
+ + "local table={}, local index={},
local tablet={}, remote tablet={}",
localReplicas.size(),
remoteReplicas.size(), remoteOlapTbl.getId(),
index.getId(), localOlapTbl.getId(),
localIndexId, localTablet.getId(),
remoteTablet.getId());
@@ -1284,8 +1286,9 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
}
private boolean genFileMappingWhenBackupReplicasEqual(PartitionInfo
localPartInfo, Partition localPartition,
- Table localTbl, BackupPartitionInfo backupPartInfo, String
partitionName, BackupOlapTableInfo tblInfo,
- ReplicaAllocation remoteReplicaAlloc) {
+ Table localTbl,
BackupPartitionInfo backupPartInfo,
+ String
partitionName, BackupOlapTableInfo tblInfo,
+ ReplicaAllocation
remoteReplicaAlloc) {
short restoreReplicaNum;
short localReplicaNum =
localPartInfo.getReplicaAllocation(localPartition.getId()).getTotalReplicaNum();
if (!reserveReplica) {
@@ -1315,7 +1318,7 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
}
private void createReplicas(Database db, AgentBatchTask batchTask,
OlapTable localTbl, Partition restorePart,
- Map<Long, TabletRef> tabletBases) {
+ Map<Long, TabletRef> tabletBases) {
Set<String> bfColumns = localTbl.getCopiedBfColumns();
double bfFpp = localTbl.getBfFpp();
@@ -1331,7 +1334,7 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
for (MaterializedIndex restoredIdx :
restorePart.getMaterializedIndices(IndexExtState.VISIBLE)) {
MaterializedIndexMeta indexMeta =
localTbl.getIndexMetaByIndexId(restoredIdx.getId());
List<Index> indexes = restoredIdx.getId() ==
localTbl.getBaseIndexId()
- ? localTbl.getCopiedIndexes() : null;
+ ? localTbl.getCopiedIndexes() : null;
List<Integer> clusterKeyUids = null;
if (indexMeta.getIndexId() == localTbl.getBaseIndexId() ||
localTbl.isShadowIndex(indexMeta.getIndexId())) {
clusterKeyUids =
OlapTable.getClusterKeyUids(indexMeta.getSchema());
@@ -1470,12 +1473,13 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
// files in repo to files in local
private void genFileMapping(OlapTable localTbl, Partition localPartition,
Long remoteTblId,
- BackupPartitionInfo backupPartInfo, boolean overwrite) {
+ BackupPartitionInfo backupPartInfo, boolean
overwrite) {
genFileMapping(localTbl, localPartition, remoteTblId, backupPartInfo,
overwrite, null);
}
private void genFileMapping(OlapTable localTbl, Partition localPartition,
Long remoteTblId,
- BackupPartitionInfo backupPartInfo, boolean overwrite, Map<Long,
TabletRef> tabletBases) {
+ BackupPartitionInfo backupPartInfo, boolean
overwrite,
+ Map<Long, TabletRef> tabletBases) {
for (MaterializedIndex localIdx :
localPartition.getMaterializedIndices(IndexExtState.VISIBLE)) {
if (LOG.isDebugEnabled()) {
LOG.debug("get index id: {}, index name: {}", localIdx.getId(),
@@ -1807,7 +1811,7 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
long signature = env.getNextId();
DownloadTask task = new DownloadTask(null, beId,
signature, jobId, dbId, srcToDest,
brokerAddrs.get(0),
-
S3ClientBEProperties.getBeFSProperties(repo.getRemoteFileSystem().getProperties()),
+
repo.getRemoteFileSystem().getStorageProperties().getBackendConfigProperties(),
repo.getRemoteFileSystem().getStorageType(),
repo.getLocation());
batchTask.addTask(task);
unfinishedSignatureToId.put(signature, beId);
@@ -2416,7 +2420,7 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
colocatePersistInfos.clear();
LOG.info("finished to cancel restore job. current state: {}. is
replay: {}. {}",
- curState.name(), isReplay, this);
+ curState.name(), isReplay, this);
// Send release snapshot tasks after log restore job, so that the
snapshot won't be released
// before the cancelled restore job is persisted.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java
index 26747e826fd..0a1518567da 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java
@@ -24,6 +24,7 @@ import
org.apache.doris.common.credentials.CloudCredentialWithEndpoint;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.fs.remote.S3FileSystem;
import com.google.common.base.Preconditions;
@@ -118,8 +119,11 @@ public class S3Resource extends Resource {
}
private static void pingS3(CloudCredentialWithEndpoint credential, String
bucketName, String rootPath,
- Map<String, String> properties) throws DdlException {
- S3FileSystem fileSystem = new S3FileSystem(properties);
+ Map<String, String> properties) throws
DdlException {
+ org.apache.doris.datasource.property.storage.S3Properties s3params =
+ (org.apache.doris.datasource.property.storage.S3Properties)
StorageProperties
+ .createStorageProperties(properties);
+ S3FileSystem fileSystem = new S3FileSystem(s3params);
String testFile = "s3://" + bucketName + "/" + rootPath +
"/test-object-valid.txt";
String content = "doris will be better";
if (FeConstants.runningUnitTest) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
index c5a2803b848..79dcad5ee6f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
@@ -86,8 +86,8 @@ public class BrokerUtil {
throws UserException {
List<RemoteFile> rfiles = new ArrayList<>();
try {
- RemoteFileSystem fileSystem = FileSystemFactory.get(
- brokerDesc.getName(), brokerDesc.getStorageType(),
brokerDesc.getProperties());
+ //fixme do we need new file system every time?
+ RemoteFileSystem fileSystem =
FileSystemFactory.get(brokerDesc.getStorageProperties());
Status st = fileSystem.globList(path, rfiles, false);
if (!st.ok()) {
throw new UserException(st.getErrMsg());
@@ -108,8 +108,7 @@ public class BrokerUtil {
}
public static void deleteDirectoryWithFileSystem(String path, BrokerDesc
brokerDesc) throws UserException {
- RemoteFileSystem fileSystem = FileSystemFactory.get(
- brokerDesc.getName(), brokerDesc.getStorageType(),
brokerDesc.getProperties());
+ RemoteFileSystem fileSystem =
FileSystemFactory.get(brokerDesc.getStorageProperties());
Status st = fileSystem.deleteDirectory(path);
if (!st.ok()) {
throw new UserException(brokerDesc.getName() + " delete directory
exception. path="
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectionProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectionProperties.java
index 2722e0d7c05..bb66d4b6f22 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectionProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectionProperties.java
@@ -21,13 +21,18 @@ import org.apache.doris.common.CatalogConfigFileUtils;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
+import lombok.Getter;
+import lombok.Setter;
import org.apache.hadoop.conf.Configuration;
import java.lang.reflect.Field;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class ConnectionProperties {
+public abstract class ConnectionProperties {
+ @Getter
+ @Setter
protected Map<String, String> origProps;
protected ConnectionProperties(Map<String, String> origProps) {
@@ -39,6 +44,7 @@ public class ConnectionProperties {
Map<String, String> allProps =
loadConfigFromFile(getResourceConfigPropName());
// 2. overwrite result properties with original properties
allProps.putAll(origProps);
+ Map<String, String> matchParams = new HashMap<>();
// 3. set fields from resultProps
List<Field> supportedProps =
PropertyUtils.getConnectorProperties(this.getClass());
for (Field field : supportedProps) {
@@ -49,6 +55,7 @@ public class ConnectionProperties {
if (allProps.containsKey(name)) {
try {
field.set(this, allProps.get(name));
+ matchParams.put(name, allProps.get(name));
} catch (IllegalAccessException e) {
throw new RuntimeException("Failed to set property " +
name + ", " + e.getMessage(), e);
}
@@ -58,6 +65,7 @@ public class ConnectionProperties {
}
// 3. check properties
checkRequiredProperties();
+ setOrigProps(matchParams);
}
// Some properties may be loaded from file
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/connection/PaimonConnectionProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/connection/PaimonConnectionProperties.java
index b9d429f8a1a..7aa6d12c684 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/connection/PaimonConnectionProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/connection/PaimonConnectionProperties.java
@@ -20,7 +20,6 @@ package org.apache.doris.datasource.property.connection;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.property.metastore.AliyunDLFProperties;
-import org.apache.doris.datasource.property.metastore.HMSProperties;
import org.apache.doris.datasource.property.metastore.MetastoreProperties;
import org.apache.doris.datasource.property.storage.HDFSProperties;
import org.apache.doris.datasource.property.storage.S3Properties;
@@ -64,9 +63,9 @@ public class PaimonConnectionProperties {
switch (metaProps.getType()) {
case HMS:
options.set("metastore", "hive");
- HMSProperties hmsProperties = (HMSProperties) metaProps;
+ //.HMSProperties hmsProperties = (HMSProperties) metaProps;
// TODO we need add all metastore parameters to paimon options?
- hmsProperties.toPaimonOptionsAndConf(options);
+ // hmsProperties.toPaimonOptionsAndConf(options);
break;
case DLF:
options.set("metastore", "hive");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractObjectStorageProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractObjectStorageProperties.java
index c5a92be9bf6..b9d84d2a7c9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractObjectStorageProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractObjectStorageProperties.java
@@ -66,7 +66,7 @@ public abstract class AbstractObjectStorageProperties extends
StorageProperties
*/
@Setter
@Getter
- @ConnectorProperty(names = {"usePathStyle"}, required = false,
+ @ConnectorProperty(names = {"usePathStyle", "s3.path-style-access"},
required = false,
description = "Whether to use path style URL for the storage.")
protected boolean usePathStyle = false;
@@ -126,4 +126,11 @@ public abstract class AbstractObjectStorageProperties
extends StorageProperties
String.valueOf(getMaxConnections()),
String.valueOf(getRequestTimeoutS()),
String.valueOf(getConnectionTimeoutS()),
String.valueOf(isUsePathStyle()));
}
+
+ @Override
+ public Map<String, String> getBackendConfigProperties() {
+ Map<String, String> config = new HashMap<>();
+ toNativeS3Configuration(config);
+ return config;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
index 2db0736cb07..1791ba67a51 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
@@ -20,6 +20,7 @@ package org.apache.doris.datasource.property.storage;
import org.apache.doris.datasource.property.ConnectorProperty;
import com.google.common.base.Strings;
+import org.apache.hadoop.conf.Configuration;
import java.util.Map;
import java.util.regex.Matcher;
@@ -50,13 +51,19 @@ public class COSProperties extends
AbstractObjectStorageProperties {
super(Type.COS, origProps);
}
+ protected static boolean guessIsMe(Map<String, String> origProps) {
+ return origProps.containsKey("cos.access_key");
+ }
+
@Override
- public void toHadoopConfiguration(Map<String, String> config) {
- config.put("fs.cosn.bucket.region", getRegion());
- config.put("fs.cos.endpoint", cosEndpoint);
- config.put("fs.cosn.userinfo.secretId", cosAccessKey);
- config.put("fs.cosn.userinfo.secretKey", cosSecretKey);
- config.put("fs.cosn.impl", "org.apache.hadoop.fs.CosFileSystem");
+ public Configuration getHadoopConfiguration() {
+ Configuration conf = new Configuration(false);
+ conf.set("fs.cosn.bucket.region", getRegion());
+ conf.set("fs.cos.endpoint", cosEndpoint);
+ conf.set("fs.cosn.userinfo.secretId", cosAccessKey);
+ conf.set("fs.cosn.userinfo.secretKey", cosSecretKey);
+ conf.set("fs.cosn.impl", "org.apache.hadoop.fs.CosFileSystem");
+ return conf;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HDFSProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HDFSProperties.java
index f4c684183ca..0b30b3f59b7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HDFSProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HDFSProperties.java
@@ -20,8 +20,10 @@ package org.apache.doris.datasource.property.storage;
import org.apache.doris.datasource.property.ConnectorProperty;
import com.google.common.base.Strings;
+import org.apache.commons.collections.MapUtils;
import org.apache.hadoop.conf.Configuration;
+import java.util.HashMap;
import java.util.Map;
public class HDFSProperties extends StorageProperties {
@@ -57,8 +59,33 @@ public class HDFSProperties extends StorageProperties {
description = "Whether to enable the impersonation of HDFS.")
private boolean hdfsImpersonationEnabled = false;
+ /**
+ * The final HDFS configuration map that determines the effective settings.
+ * Priority rules:
+ * 1. If a key exists in `overrideConfig` (user-provided settings), its
value takes precedence.
+ * 2. If a key is not present in `overrideConfig`, the value from
`hdfs-site.xml` or `core-site.xml` is used.
+ * 3. This map should be used to read the resolved HDFS configuration,
ensuring the correct precedence is applied.
+ */
+ Map<String, String> finalHdfsConfig;
+
public HDFSProperties(Map<String, String> origProps) {
super(Type.HDFS, origProps);
+ // to be care setOrigProps(matchParams);
+ loadFinalHdfsConfig(origProps);
+ }
+
+ private void loadFinalHdfsConfig(Map<String, String> origProps) {
+ if (MapUtils.isEmpty(origProps)) {
+ return;
+ }
+ finalHdfsConfig = new HashMap<>();
+ Configuration configuration = new Configuration();
+ origProps.forEach((k, v) -> {
+ if (null != configuration.getTrimmed(k)) {
+ finalHdfsConfig.put(k, v);
+ }
+ });
+
}
@Override
@@ -88,13 +115,48 @@ public class HDFSProperties extends StorageProperties {
public void toHadoopConfiguration(Configuration conf) {
Map<String, String> allProps =
loadConfigFromFile(getResourceConfigPropName());
allProps.forEach(conf::set);
- conf.set("hdfs.authentication.type", hdfsAuthenticationType);
+ if (MapUtils.isNotEmpty(finalHdfsConfig)) {
+ finalHdfsConfig.forEach(conf::set);
+ }
+ //todo waiting be support should use new params
+ conf.set("hdfs.security.authentication", hdfsAuthenticationType);
+ if ("kerberos".equalsIgnoreCase(hdfsAuthenticationType)) {
+ conf.set("hadoop.kerberos.principal", hdfsKerberosPrincipal);
+ conf.set("hadoop.kerberos.keytab", hdfsKerberosKeytab);
+ }
+ if (!Strings.isNullOrEmpty(hadoopUsername)) {
+ conf.set("hadoop.username", hadoopUsername);
+ }
+ }
+
+ public Configuration getHadoopConfiguration() {
+ Configuration conf = new Configuration(false);
+ Map<String, String> allProps =
loadConfigFromFile(getResourceConfigPropName());
+ allProps.forEach(conf::set);
+ if (MapUtils.isNotEmpty(finalHdfsConfig)) {
+ finalHdfsConfig.forEach(conf::set);
+ }
+ conf.set("hdfs.security.authentication", hdfsAuthenticationType);
if ("kerberos".equalsIgnoreCase(hdfsAuthenticationType)) {
- conf.set("hdfs.authentication.kerberos.principal",
hdfsKerberosPrincipal);
- conf.set("hdfs.authentication.kerberos.keytab",
hdfsKerberosKeytab);
+ conf.set("hadoop.kerberos.principal", hdfsKerberosPrincipal);
+ conf.set("hadoop.kerberos.keytab", hdfsKerberosKeytab);
}
if (!Strings.isNullOrEmpty(hadoopUsername)) {
conf.set("hadoop.username", hadoopUsername);
}
+
+ return conf;
+ }
+
+ //fixme be should send use input params
+ @Override
+ public Map<String, String> getBackendConfigProperties() {
+ Configuration configuration = getHadoopConfiguration();
+ Map<String, String> backendConfigProperties = new HashMap<>();
+ for (Map.Entry<String, String> entry : configuration) {
+ backendConfigProperties.put(entry.getKey(), entry.getValue());
+ }
+
+ return backendConfigProperties;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
index 2e36386fb64..1cdb4a83139 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
@@ -20,6 +20,7 @@ package org.apache.doris.datasource.property.storage;
import org.apache.doris.datasource.property.ConnectorProperty;
import com.google.common.base.Strings;
+import org.apache.hadoop.conf.Configuration;
import java.util.Map;
import java.util.regex.Matcher;
@@ -44,13 +45,19 @@ public class OBSProperties extends
AbstractObjectStorageProperties {
// Initialize fields from origProps
}
+ protected static boolean guessIsMe(Map<String, String> origProps) {
+ return origProps.containsKey("obs.access_key");
+ }
+
+
@Override
- public void toHadoopConfiguration(Map<String, String> config) {
- config.put("fs.obs.endpoint", obsEndpoint);
- config.put("fs.obs.access.key", obsAccessKey);
- config.put("fs.obs.secret.key", obsSecretKey);
- config.put("fs.obs.impl", "org.apache.hadoop.fs.obs.OBSFileSystem");
- //set other k v if nessesary
+ public Configuration getHadoopConfiguration() {
+ Configuration conf = new Configuration(false);
+ conf.set("fs.obs.endpoint", obsEndpoint);
+ conf.set("fs.obs.access.key", obsAccessKey);
+ conf.set("fs.obs.secret.key", obsSecretKey);
+ conf.set("fs.obs.impl", "org.apache.hadoop.fs.obs.OBSFileSystem");
+ return conf;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
index 1498e4e7b1a..712a1ab61e5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
@@ -20,6 +20,7 @@ package org.apache.doris.datasource.property.storage;
import org.apache.doris.datasource.property.ConnectorProperty;
import com.google.common.base.Strings;
+import org.apache.hadoop.conf.Configuration;
import java.util.Map;
import java.util.regex.Matcher;
@@ -42,12 +43,18 @@ public class OSSProperties extends
AbstractObjectStorageProperties {
super(Type.OSS, origProps);
}
+ protected static boolean guessIsMe(Map<String, String> origProps) {
+ return origProps.containsKey("oss.access_key");
+ }
+
@Override
- public void toHadoopConfiguration(Map<String, String> config) {
- config.put("fs.oss.endpoint", endpoint);
- config.put("fs.oss.accessKeyId", accessKey);
- config.put("fs.oss.accessKeySecret", secretKey);
- config.put("fs.oss.impl",
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
+ public Configuration getHadoopConfiguration() {
+ Configuration conf = new Configuration(false);
+ conf.set("fs.oss.endpoint", endpoint);
+ conf.set("fs.oss.accessKeyId", accessKey);
+ conf.set("fs.oss.accessKeySecret", secretKey);
+ conf.set("fs.oss.impl",
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
+ return conf;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/ObjectStorageProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/ObjectStorageProperties.java
index a05d606d6da..2465fbf6192 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/ObjectStorageProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/ObjectStorageProperties.java
@@ -17,6 +17,8 @@
package org.apache.doris.datasource.property.storage;
+import org.apache.hadoop.conf.Configuration;
+
import java.util.Map;
/**
@@ -34,7 +36,7 @@ public interface ObjectStorageProperties {
* @param config a map to populate with the HDFS-compatible configuration
parameters.
* These parameters will be used by Hadoop clients to
connect to the object storage system.
*/
- void toHadoopConfiguration(Map<String, String> config);
+ Configuration getHadoopConfiguration();
/**
* Converts the object storage properties to a configuration map
compatible with the
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
index 4b1eb4e0012..ad4fcd2eb81 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
@@ -23,9 +23,11 @@ import
org.apache.doris.datasource.property.metastore.AliyunDLFProperties;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
import org.apache.paimon.options.Options;
import java.lang.reflect.Field;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -49,11 +51,6 @@ public class S3Properties extends
AbstractObjectStorageProperties {
description = "The secret key of S3.")
protected String s3SecretKey = "";
- @ConnectorProperty(names = {"use_path_style",
- "s3.path-style-access"},
- required = false,
- description = "Whether to use path style access.")
- protected String usePathStyle = "false";
@ConnectorProperty(names = {"s3.connection.maximum",
"AWS_MAX_CONNECTIONS"},
@@ -150,25 +147,34 @@ public class S3Properties extends
AbstractObjectStorageProperties {
catalogProps.put("s3.access-key-id", s3AccessKey);
catalogProps.put("s3.secret-access-key", s3SecretKey);
catalogProps.put("client.region", s3Region);
- catalogProps.put("s3.path-style-access", usePathStyle);
+ catalogProps.put("s3.path-style-access",
Boolean.toString(usePathStyle));
}
@Override
- public void toHadoopConfiguration(Map<String, String> config) {
- config.put("fs.s3a.access.key", s3AccessKey); // AWS Access Key
- config.put("fs.s3a.secret.key", s3SecretKey); // AWS Secret Key
- config.put("fs.s3a.endpoint", s3Endpoint);
- config.put("fs.s3a.region", s3Region);
- config.put("fs.s3a.connection.maximum",
String.valueOf(s3ConnectionMaximum));
- config.put("fs.s3a.connection.timeout",
String.valueOf(s3ConnectionRequestTimeoutS));
- config.put("fs.s3a.request.timeout",
String.valueOf(s3ConnectionTimeoutS));
- config.put("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
+ public Configuration getHadoopConfiguration() {
+ Configuration conf = new Configuration(false);
+ conf.set("fs.s3a.access.key", s3AccessKey);
+ conf.set("fs.s3a.secret.key", s3SecretKey);
+ conf.set("fs.s3a.endpoint", s3Endpoint);
+ conf.set("fs.s3a.region", s3Region);
+ conf.set("fs.s3a.connection.maximum",
String.valueOf(s3ConnectionMaximum));
+ conf.set("fs.s3a.connection.timeout",
String.valueOf(s3ConnectionRequestTimeoutS));
+ conf.set("fs.s3a.request.timeout",
String.valueOf(s3ConnectionTimeoutS));
+ conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
+ return conf;
}
@Override
public void toNativeS3Configuration(Map<String, String> config) {
Map<String, String> awsS3Properties =
generateAWSS3Properties(s3Endpoint, s3Region, s3AccessKey, s3SecretKey,
- s3ConnectionMaximum, s3ConnectionRequestTimeoutS,
s3ConnectionTimeoutS, usePathStyle);
+ s3ConnectionMaximum, s3ConnectionRequestTimeoutS,
s3ConnectionTimeoutS, String.valueOf(usePathStyle));
config.putAll(awsS3Properties);
}
+
+ @Override
+ public Map<String, String> getBackendConfigProperties() {
+ Map<String, String> config = new HashMap<>();
+ toNativeS3Configuration(config);
+ return config;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
index 8db195246e4..8fdc486b7e8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
@@ -28,7 +28,7 @@ import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
-public class StorageProperties extends ConnectionProperties {
+public abstract class StorageProperties extends ConnectionProperties {
public static final String FS_HDFS_SUPPORT = "fs.hdfs.support";
public static final String FS_S3_SUPPORT = "fs.s3.support";
@@ -47,6 +47,8 @@ public class StorageProperties extends ConnectionProperties {
UNKNOWN
}
+ public abstract Map<String, String> getBackendConfigProperties();
+
@Getter
protected Type type;
@@ -102,6 +104,35 @@ public class StorageProperties extends
ConnectionProperties {
return storageProperties;
}
+ public static StorageProperties createStorageProperties(Map<String,
String> origProps) {
+ StorageProperties storageProperties = null;
+ // 1. parse the storage properties by user specified fs.xxx.support
properties
+ if (isFsSupport(origProps, FS_HDFS_SUPPORT)) {
+ storageProperties = new HDFSProperties(origProps);
+ }
+
+ if (isFsSupport(origProps, FS_S3_SUPPORT) ||
S3Properties.guessIsMe(origProps)) {
+ storageProperties = new S3Properties(origProps);
+ }
+ if (isFsSupport(origProps, FS_OSS_SUPPORT) ||
OSSProperties.guessIsMe(origProps)) {
+ storageProperties = new OSSProperties(origProps);
+ }
+ if (isFsSupport(origProps, FS_OBS_SUPPORT) ||
OBSProperties.guessIsMe(origProps)) {
+ storageProperties = new OBSProperties(origProps);
+ }
+ if (isFsSupport(origProps, FS_COS_SUPPORT) ||
COSProperties.guessIsMe(origProps)) {
+ storageProperties = new COSProperties(origProps);
+ }
+ if (null == storageProperties) {
+ throw new RuntimeException("not support this fs");
+ }
+ storageProperties.normalizedAndCheckProps();
+ //load from default file
+ return storageProperties;
+
+ }
+
+
protected StorageProperties(Type type, Map<String, String> origProps) {
super(origProps);
this.type = type;
@@ -123,4 +154,5 @@ public class StorageProperties extends ConnectionProperties
{
}
return false;
}
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageTypeMapper.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageTypeMapper.java
new file mode 100644
index 00000000000..4cff2a1a675
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageTypeMapper.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.storage;
+
+import org.apache.doris.fs.remote.RemoteFileSystem;
+import org.apache.doris.fs.remote.S3FileSystem;
+import org.apache.doris.fs.remote.dfs.DFSFileSystem;
+
+import java.util.Arrays;
+import java.util.function.Function;
+
+public enum StorageTypeMapper {
+ HDFS(HDFSProperties.class, DFSFileSystem::new),
+ OSS(OSSProperties.class, S3FileSystem::new),
+ OBS(OBSProperties.class, S3FileSystem::new),
+ COS(COSProperties.class, S3FileSystem::new),
+ S3(S3Properties.class, S3FileSystem::new);
+
+ private final Class<? extends StorageProperties> propClass;
+ private final Function<StorageProperties, RemoteFileSystem> factory;
+
+ <T extends StorageProperties> StorageTypeMapper(Class<T> propClass,
Function<T, RemoteFileSystem> factory) {
+ this.propClass = propClass;
+ this.factory = (prop) -> factory.apply(propClass.cast(prop));
+ }
+
+ public static RemoteFileSystem create(StorageProperties prop) {
+ return Arrays.stream(values())
+ .filter(type -> type.propClass.isInstance(prop))
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("Unknown storage
type"))
+ .factory.apply(prop);
+ }
+}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
index fb23005f4ac..dd57e52ce4a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
@@ -18,14 +18,9 @@
package org.apache.doris.fs;
import org.apache.doris.analysis.StorageBackend;
-import org.apache.doris.datasource.property.constants.AzureProperties;
-import org.apache.doris.fs.remote.AzureFileSystem;
-import org.apache.doris.fs.remote.BrokerFileSystem;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+import org.apache.doris.datasource.property.storage.StorageTypeMapper;
import org.apache.doris.fs.remote.RemoteFileSystem;
-import org.apache.doris.fs.remote.S3FileSystem;
-import org.apache.doris.fs.remote.dfs.DFSFileSystem;
-import org.apache.doris.fs.remote.dfs.JFSFileSystem;
-import org.apache.doris.fs.remote.dfs.OFSFileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -36,7 +31,7 @@ import java.util.Map;
public class FileSystemFactory {
public static RemoteFileSystem get(String name, StorageBackend.StorageType
type, Map<String, String> properties) {
- // TODO: rename StorageBackend.StorageType
+ /*// TODO: rename StorageBackend.StorageType
if (type == StorageBackend.StorageType.S3) {
if (AzureProperties.checkAzureProviderPropertyExist(properties)) {
return new AzureFileSystem(properties);
@@ -52,12 +47,18 @@ public class FileSystemFactory {
return new BrokerFileSystem(name, properties);
} else {
throw new UnsupportedOperationException(type.toString() + "backend
is not implemented");
- }
+ }*/
+ return null;
}
+ public static RemoteFileSystem get(StorageProperties storageProperties) {
+ return StorageTypeMapper.create(storageProperties);
+ }
+
+
public static RemoteFileSystem getRemoteFileSystem(FileSystemType type,
Map<String, String> properties,
String bindBrokerName) {
- switch (type) {
+ /*switch (type) {
case S3:
if
(AzureProperties.checkAzureProviderPropertyExist(properties)) {
return new AzureFileSystem(properties);
@@ -76,7 +77,8 @@ public class FileSystemFactory {
return new AzureFileSystem(properties);
default:
throw new IllegalStateException("Not supported file system
type: " + type);
- }
+ }*/
+ return null;
}
public static RemoteFileSystem getS3FileSystem(Map<String, String>
properties) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/PersistentFileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/PersistentFileSystem.java
index 9cb156036d9..5ae1255a190 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/PersistentFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/PersistentFileSystem.java
@@ -19,10 +19,12 @@ package org.apache.doris.fs;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.common.io.Text;
+import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.persist.gson.GsonPreProcessable;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
+import lombok.Getter;
import java.io.DataInput;
import java.io.IOException;
@@ -39,6 +41,9 @@ public abstract class PersistentFileSystem implements
FileSystem, GsonPreProcess
public String name;
public StorageBackend.StorageType type;
+ @Getter
+ protected StorageProperties storageProperties;
+
public boolean needFullPath() {
return type == StorageBackend.StorageType.S3
|| type == StorageBackend.StorageType.OFS
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
index be53ffde2e0..5910ed80c9e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
@@ -22,9 +22,8 @@ import org.apache.doris.backup.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
-import org.apache.doris.datasource.property.PropertyConverter;
+import
org.apache.doris.datasource.property.storage.AbstractObjectStorageProperties;
import org.apache.doris.fs.obj.S3ObjStorage;
-import org.apache.doris.fs.remote.dfs.DFSFileSystem;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.google.common.annotations.VisibleForTesting;
@@ -44,10 +43,17 @@ public class S3FileSystem extends ObjFileSystem {
private static final Logger LOG = LogManager.getLogger(S3FileSystem.class);
private HadoopAuthenticator authenticator = null;
+ private AbstractObjectStorageProperties s3Properties;
- public S3FileSystem(Map<String, String> properties) {
- super(StorageBackend.StorageType.S3.name(),
StorageBackend.StorageType.S3, new S3ObjStorage(properties));
+
+ public S3FileSystem(AbstractObjectStorageProperties s3Properties) {
+
+ super(StorageBackend.StorageType.S3.name(),
StorageBackend.StorageType.S3,
+ new S3ObjStorage(s3Properties.getOrigProps()));
+ this.s3Properties = s3Properties;
+ this.storageProperties = s3Properties;
initFsProperties();
+
}
@VisibleForTesting
@@ -72,14 +78,8 @@ public class S3FileSystem extends ObjFileSystem {
throw new UserException("FileSystem is closed.");
}
if (dfsFileSystem == null) {
- Configuration conf =
DFSFileSystem.getHdfsConf(ifNotSetFallbackToSimpleAuth());
+ Configuration conf = s3Properties.getHadoopConfiguration();
System.setProperty("com.amazonaws.services.s3.enableV4",
"true");
- // the entry value in properties may be null, and
-
PropertyConverter.convertToHadoopFSProperties(properties).entrySet().stream()
- .filter(entry -> entry.getKey() != null &&
entry.getValue() != null)
- .forEach(entry -> conf.set(entry.getKey(),
entry.getValue()));
- // S3 does not support Kerberos authentication,
- // so here we create a simple authentication
AuthenticationConfig authConfig =
AuthenticationConfig.getSimpleAuthenticationConfig(conf);
HadoopAuthenticator authenticator =
HadoopAuthenticator.getHadoopAuthenticator(authConfig);
try {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
index 89f4af2817e..49023f5a989 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
@@ -20,9 +20,9 @@ package org.apache.doris.fs.remote.dfs;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.backup.Status;
import org.apache.doris.common.UserException;
-import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.common.util.URI;
+import org.apache.doris.datasource.property.storage.HDFSProperties;
import org.apache.doris.fs.operations.HDFSFileOperations;
import org.apache.doris.fs.operations.HDFSOpParams;
import org.apache.doris.fs.operations.OpParams;
@@ -55,7 +55,6 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Comparator;
import java.util.List;
-import java.util.Map;
public class DFSFileSystem extends RemoteFileSystem {
@@ -63,14 +62,20 @@ public class DFSFileSystem extends RemoteFileSystem {
private static final Logger LOG =
LogManager.getLogger(DFSFileSystem.class);
private HDFSFileOperations operations = null;
private HadoopAuthenticator authenticator = null;
+ private HDFSProperties hdfsProperties;
- public DFSFileSystem(Map<String, String> properties) {
- this(StorageBackend.StorageType.HDFS, properties);
+ public DFSFileSystem(HDFSProperties hdfsProperties) {
+
+
+ super(StorageBackend.StorageType.HDFS.name(),
StorageBackend.StorageType.HDFS);
+ this.properties.putAll(properties);
+ this.hdfsProperties = hdfsProperties;
}
- public DFSFileSystem(StorageBackend.StorageType type, Map<String, String>
properties) {
- super(type.name(), type);
+ public DFSFileSystem(HDFSProperties hdfsProperties,
StorageBackend.StorageType storageType) {
+ super(storageType.name(), storageType);
this.properties.putAll(properties);
+ this.hdfsProperties = hdfsProperties;
}
@VisibleForTesting
@@ -85,12 +90,8 @@ public class DFSFileSystem extends RemoteFileSystem {
throw new UserException("FileSystem is closed.");
}
if (dfsFileSystem == null) {
- Configuration conf =
getHdfsConf(ifNotSetFallbackToSimpleAuth());
- for (Map.Entry<String, String> propEntry :
properties.entrySet()) {
- conf.set(propEntry.getKey(), propEntry.getValue());
- }
- AuthenticationConfig authConfig =
AuthenticationConfig.getKerberosConfig(conf);
- authenticator =
HadoopAuthenticator.getHadoopAuthenticator(authConfig);
+ Configuration conf =
hdfsProperties.getHadoopConfiguration();
+ authenticator =
HadoopAuthenticator.getHadoopAuthenticator(conf);
try {
dfsFileSystem = authenticator.doAs(() -> {
try {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/JFSFileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/JFSFileSystem.java
index ffabb211d08..165aae1942a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/JFSFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/JFSFileSystem.java
@@ -18,11 +18,10 @@
package org.apache.doris.fs.remote.dfs;
import org.apache.doris.analysis.StorageBackend;
-
-import java.util.Map;
+import org.apache.doris.datasource.property.storage.HDFSProperties;
public class JFSFileSystem extends DFSFileSystem {
- public JFSFileSystem(Map<String, String> properties) {
- super(StorageBackend.StorageType.JFS, properties);
+ public JFSFileSystem(HDFSProperties hdfsProperties) {
+ super(hdfsProperties, StorageBackend.StorageType.JFS);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/OFSFileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/OFSFileSystem.java
index dd69a300392..bdc3ea8e771 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/OFSFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/OFSFileSystem.java
@@ -18,11 +18,10 @@
package org.apache.doris.fs.remote.dfs;
import org.apache.doris.analysis.StorageBackend;
-
-import java.util.Map;
+import org.apache.doris.datasource.property.storage.HDFSProperties;
public class OFSFileSystem extends DFSFileSystem {
- public OFSFileSystem(Map<String, String> properties) {
- super(StorageBackend.StorageType.OFS, properties);
+ public OFSFileSystem(HDFSProperties properties) {
+ super(properties, StorageBackend.StorageType.OFS);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index 9031efd0dc2..74883c2f20c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -41,6 +41,7 @@ import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.common.util.FileFormatUtils;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.datasource.tvf.source.TVFScanNode;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.PlanNodeId;
@@ -108,6 +109,7 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
protected List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();
protected Map<String, String> locationProperties = Maps.newHashMap();
+ protected StorageProperties storageProperties;
protected String filePath;
protected TFileFormatType fileFormatType;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
index 3defb171a9f..f6714995d67 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
@@ -22,12 +22,11 @@ import org.apache.doris.analysis.StorageBackend.StorageType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
-import org.apache.doris.common.credentials.CloudCredentialWithEndpoint;
import org.apache.doris.common.util.S3URI;
import org.apache.doris.datasource.property.PropertyConverter;
-import org.apache.doris.datasource.property.S3ClientBEProperties;
import org.apache.doris.datasource.property.constants.AzureProperties;
import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.thrift.TFileType;
@@ -77,66 +76,26 @@ public class S3TableValuedFunction extends
ExternalFileTableValuedFunction {
// get endpoint first from properties, if not present, get it from s3
uri.
// If endpoint is missing, exception will be thrown.
- String endpoint = constructEndpoint(otherProps, s3uri);
- if (!otherProps.containsKey(S3Properties.REGION)) {
- String region;
- if (AzureProperties.checkAzureProviderPropertyExist(properties)) {
- // Azure could run without region
- region = s3uri.getRegion().orElse("DUMMY-REGION");
- } else {
- region = s3uri.getRegion().orElseThrow(() -> new
AnalysisException(
- String.format("Properties '%s' is required.",
S3Properties.REGION)));
- }
- otherProps.put(S3Properties.REGION, region);
- }
- checkNecessaryS3Properties(otherProps);
- CloudCredentialWithEndpoint credential = new
CloudCredentialWithEndpoint(endpoint,
- getOrDefaultAndRemove(otherProps, S3Properties.REGION, ""),
- getOrDefaultAndRemove(otherProps, S3Properties.ACCESS_KEY, ""),
- getOrDefaultAndRemove(otherProps, S3Properties.SECRET_KEY,
""));
- if (otherProps.containsKey(S3Properties.SESSION_TOKEN)) {
- credential.setSessionToken(getOrDefaultAndRemove(otherProps,
S3Properties.SESSION_TOKEN, ""));
- }
-
- locationProperties = S3Properties.credentialToMap(credential);
locationProperties.put(PropertyConverter.USE_PATH_STYLE, usePathStyle);
if (AzureProperties.checkAzureProviderPropertyExist(properties)) {
+ //todo waiting support for Azure
// For Azure's compatibility, we need bucket to connect to the
blob storage's container
locationProperties.put(S3Properties.BUCKET, s3uri.getBucket());
}
-
locationProperties.putAll(S3ClientBEProperties.getBeFSProperties(locationProperties));
+ this.storageProperties =
StorageProperties.createStorageProperties(properties);
+
locationProperties.putAll(storageProperties.getBackendConfigProperties());
locationProperties.putAll(otherProps);
filePath = NAME + S3URI.SCHEME_DELIM + s3uri.getBucket() +
S3URI.PATH_DELIM + s3uri.getKey();
if (FeConstants.runningUnitTest) {
// Just check
- FileSystemFactory.getS3FileSystem(locationProperties);
+ FileSystemFactory.get(storageProperties);
} else {
parseFile();
}
}
- private String constructEndpoint(Map<String, String> properties, S3URI
s3uri) throws AnalysisException {
- String endpoint;
- if (!AzureProperties.checkAzureProviderPropertyExist(properties)) {
- // get endpoint first from properties, if not present, get it from
s3 uri.
- // If endpoint is missing, exception will be thrown.
- endpoint = getOrDefaultAndRemove(properties,
S3Properties.ENDPOINT, s3uri.getEndpoint().orElse(""));
- if (Strings.isNullOrEmpty(endpoint)) {
- throw new AnalysisException(String.format("Properties '%s' is
required.", S3Properties.ENDPOINT));
- }
- } else {
- String bucket = s3uri.getBucket();
- String accountName =
properties.getOrDefault(S3Properties.ACCESS_KEY, "");
- if (accountName.isEmpty()) {
- throw new AnalysisException(String.format("Properties '%s' is
required.", S3Properties.ACCESS_KEY));
- }
- endpoint = String.format(AzureProperties.AZURE_ENDPOINT_TEMPLATE,
accountName, bucket);
- }
- return endpoint;
- }
-
private void forwardCompatibleDeprecatedKeys(Map<String, String> props) {
for (String deprecatedKey : DEPRECATED_KEYS) {
String value = props.remove(deprecatedKey);
@@ -146,13 +105,6 @@ public class S3TableValuedFunction extends
ExternalFileTableValuedFunction {
}
}
- private void checkNecessaryS3Properties(Map<String, String> props) throws
AnalysisException {
- if (Strings.isNullOrEmpty(props.get(S3Properties.REGION))) {
- throw new AnalysisException(String.format("Properties '%s' is
required.", S3Properties.REGION));
- }
- // do not check ak and sk, because we can read them from system
environment.
- }
-
private S3URI getS3Uri(String uri, boolean isPathStyle, boolean
forceParsingStandardUri) throws AnalysisException {
try {
return S3URI.create(uri, isPathStyle, forceParsingStandardUri);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogAPITest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogAPITest.java
index 19eaa747b00..7fe062c7d03 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogAPITest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogAPITest.java
@@ -20,6 +20,8 @@ package org.apache.doris.datasource;
import org.apache.doris.backup.Status;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
+import org.apache.doris.datasource.property.storage.HDFSProperties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.fs.remote.RemoteFile;
import org.apache.doris.fs.remote.dfs.DFSFileSystem;
@@ -203,7 +205,8 @@ public class CatalogAPITest {
properties.put("fs.oss.accessKeyId", ak);
properties.put("fs.oss.accessKeySecret", sk);
properties.put("fs.oss.endpoint", "cn-beijing.oss-dls.aliyuncs.com");
- DFSFileSystem fs = new DFSFileSystem(properties);
+ HDFSProperties hdfsProperties = (HDFSProperties)
StorageProperties.createStorageProperties(properties);
+ DFSFileSystem fs = new DFSFileSystem(hdfsProperties);
List<RemoteFile> results = new ArrayList<>();
Status st = fs.listFiles(remotePath, false, results);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/HMSPropertiesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/HMSPropertiesTest.java
index a51e3e98b33..9b44f80aa59 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/HMSPropertiesTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/HMSPropertiesTest.java
@@ -86,7 +86,7 @@ public class HMSPropertiesTest {
private void testHmsToPaimonOptions(HMSProperties hmsProperties) {
Options paimonOptions = new Options();
- hmsProperties.toPaimonOptionsAndConf(paimonOptions);
+ //hmsProperties.toPaimonOptionsAndConf(paimonOptions);
Assertions.assertEquals("thrift://127.0.0.1:9083",
paimonOptions.get("uri"));
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/COSPropertiesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/COSPropertiesTest.java
index 46d63b33209..18a0f710a86 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/COSPropertiesTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/COSPropertiesTest.java
@@ -56,8 +56,7 @@ public class COSPropertiesTest {
origProps.put("cos.use_path_style", "true");
origProps.put(StorageProperties.FS_COS_SUPPORT, "true");
COSProperties cosProperties = (COSProperties)
StorageProperties.create(origProps).get(1);
- Map<String, String> config = new HashMap<>();
- cosProperties.toHadoopConfiguration(config);
+ Configuration config = cosProperties.getHadoopConfiguration();
// Validate the configuration
Assertions.assertEquals("https://cos.example.com",
config.get("fs.cos.endpoint"));
@@ -123,12 +122,7 @@ public class COSPropertiesTest {
origProps.put(StorageProperties.FS_COS_SUPPORT, "true");
COSProperties cosProperties = (COSProperties)
StorageProperties.create(origProps).get(1);
- Map<String, String> hdfsParams = new HashMap<>();
- cosProperties.toHadoopConfiguration(hdfsParams);
- Configuration configuration = new Configuration(false);
- for (Map.Entry<String, String> entry : hdfsParams.entrySet()) {
- configuration.set(entry.getKey(), entry.getValue());
- }
+ Configuration configuration = cosProperties.getHadoopConfiguration();
FileSystem fs = FileSystem.get(new URI(hdfsPath), configuration);
FileStatus[] fileStatuses = fs.listStatus(new Path(hdfsPath));
for (FileStatus status : fileStatuses) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OBSPropertyTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OBSPropertyTest.java
index 90742e61783..d4191d4be26 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OBSPropertyTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OBSPropertyTest.java
@@ -43,12 +43,12 @@ public class OBSPropertyTest {
origProps.put(StorageProperties.FS_OBS_SUPPORT, "true");
ObjectStorageProperties properties = (ObjectStorageProperties)
StorageProperties.create(origProps).get(1);
- properties.toHadoopConfiguration(origProps);
+ Configuration conf = properties.getHadoopConfiguration();
- Assertions.assertEquals("https://obs.example.com",
origProps.get("fs.obs.endpoint"));
- Assertions.assertEquals("myOBSAccessKey",
origProps.get("fs.obs.access.key"));
- Assertions.assertEquals("myOBSSecretKey",
origProps.get("fs.obs.secret.key"));
- Assertions.assertEquals("org.apache.hadoop.fs.obs.OBSFileSystem",
origProps.get("fs.obs.impl"));
+ Assertions.assertEquals("https://obs.example.com",
conf.get("fs.obs.endpoint"));
+ Assertions.assertEquals("myOBSAccessKey",
conf.get("fs.obs.access.key"));
+ Assertions.assertEquals("myOBSSecretKey",
conf.get("fs.obs.secret.key"));
+ Assertions.assertEquals("org.apache.hadoop.fs.obs.OBSFileSystem",
conf.get("fs.obs.impl"));
// Test creation without additional properties
origProps = new HashMap<>();
@@ -111,12 +111,7 @@ public class OBSPropertyTest {
origProps.put("obs.endpoint", "obs.cn-north-4.myhuaweicloud.com");
origProps.put(StorageProperties.FS_OBS_SUPPORT, "true");
OBSProperties obsProperties = (OBSProperties)
StorageProperties.create(origProps).get(1);
- Map<String, String> hdfsParams = new HashMap<>();
- obsProperties.toHadoopConfiguration(hdfsParams);
- Configuration configuration = new Configuration(false);
- for (Map.Entry<String, String> entry : hdfsParams.entrySet()) {
- configuration.set(entry.getKey(), entry.getValue());
- }
+ Configuration configuration = obsProperties.getHadoopConfiguration();
FileSystem fs = FileSystem.get(new URI(hdfsPath), configuration);
FileStatus[] fileStatuses = fs.listStatus(new Path(hdfsPath));
for (FileStatus status : fileStatuses) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OSSPropertiesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OSSPropertiesTest.java
index 097b002f5c8..8002f8781fa 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OSSPropertiesTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OSSPropertiesTest.java
@@ -45,11 +45,11 @@ public class OSSPropertiesTest {
origProps.put("oss.secret_key", "myOSSSecretKey");
origProps.put(StorageProperties.FS_OSS_SUPPORT, "true");
ObjectStorageProperties properties = (ObjectStorageProperties)
StorageProperties.create(origProps).get(1);
- properties.toHadoopConfiguration(origProps);
- Assertions.assertEquals("https://oss.aliyuncs.com",
origProps.get("fs.oss.endpoint"));
- Assertions.assertEquals("myOSSAccessKey",
origProps.get("fs.oss.accessKeyId"));
- Assertions.assertEquals("myOSSSecretKey",
origProps.get("fs.oss.accessKeySecret"));
-
Assertions.assertEquals("org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem",
origProps.get("fs.oss.impl"));
+ Configuration conf = properties.getHadoopConfiguration();
+ Assertions.assertEquals("https://oss.aliyuncs.com",
conf.get("fs.oss.endpoint"));
+ Assertions.assertEquals("myOSSAccessKey",
conf.get("fs.oss.accessKeyId"));
+ Assertions.assertEquals("myOSSSecretKey",
conf.get("fs.oss.accessKeySecret"));
+
Assertions.assertEquals("org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem",
conf.get("fs.oss.impl"));
origProps = new HashMap<>();
origProps.put("oss.endpoint", "https://oss.aliyuncs.com");
StorageProperties.create(origProps);
@@ -105,12 +105,7 @@ public class OSSPropertiesTest {
origProps.put(StorageProperties.FS_OSS_SUPPORT, "true");
OSSProperties ossProperties = (OSSProperties)
StorageProperties.create(origProps).get(1);
// ossParams.put("fs.AbstractFileSystem.oss.impl",
"com.aliyun.jindodata.oss.JindoOSS");
- Map<String, String> hadoopParams = new HashMap<>();
- ossProperties.toHadoopConfiguration(hadoopParams);
- Configuration configuration = new Configuration(false);
- for (Map.Entry<String, String> entry : hadoopParams.entrySet()) {
- configuration.set(entry.getKey(), entry.getValue());
- }
+ Configuration configuration = ossProperties.getHadoopConfiguration();
FileSystem fs = FileSystem.get(new URI(hdfsPath), configuration);
FileStatus[] fileStatuses = fs.listStatus(new Path(hdfsPath));
for (FileStatus status : fileStatuses) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java
index 83eb06afc8b..0e2085b90cc 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java
@@ -52,9 +52,7 @@ public class S3PropertiesTest {
origProps.put("s3.region", "us-west-1");
origProps.put(StorageProperties.FS_S3_SUPPORT, "true");
S3Properties s3Properties = (S3Properties)
StorageProperties.create(origProps).get(1);
- Map<String, String> config = new HashMap<>();
- s3Properties.toHadoopConfiguration(config);
-
+ Configuration config = s3Properties.getHadoopConfiguration();
// Validate the configuration
Assertions.assertEquals("myS3AccessKey",
config.get("fs.s3a.access.key"));
Assertions.assertEquals("myS3SecretKey",
config.get("fs.s3a.secret.key"));
@@ -118,13 +116,7 @@ public class S3PropertiesTest {
origProps.put("s3.region", "ap-northeast-1");
origProps.put(StorageProperties.FS_S3_SUPPORT, "true");
S3Properties s3Properties = (S3Properties)
StorageProperties.create(origProps).get(1);
-
- Map<String, String> hdfsParams = new HashMap<>();
- s3Properties.toHadoopConfiguration(hdfsParams);
- Configuration configuration = new Configuration(false);
- for (Map.Entry<String, String> entry : hdfsParams.entrySet()) {
- configuration.set(entry.getKey(), entry.getValue());
- }
+ Configuration configuration = s3Properties.getHadoopConfiguration();
FileSystem fs = FileSystem.get(new URI(hdfsPath), configuration);
FileStatus[] fileStatuses = fs.listStatus(new Path(hdfsPath));
for (FileStatus status : fileStatuses) {
diff --git a/fe/pom.xml b/fe/pom.xml
index f0b1c03ffe7..cad105cd4c6 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -262,6 +262,7 @@ under the License.
<javax.servlet-api.version>3.1.0</javax.servlet-api.version>
<je.version>18.3.14-doris-SNAPSHOT</je.version>
<jflex.version>1.4.3</jflex.version>
+ <re2j.version>1.8</re2j.version>
<jmockit.version>1.49</jmockit.version>
<commons-io.version>2.7</commons-io.version>
<json-simple.version>1.1.1</json-simple.version>
@@ -602,6 +603,11 @@ under the License.
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-api</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
@@ -693,6 +699,11 @@ under the License.
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>com.google.re2j</groupId>
+ <artifactId>re2j</artifactId>
+ <version>${re2j.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.hbase.thirdparty</groupId>
<artifactId>hbase-shaded-gson</artifactId>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]