This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 54acd326c refactor: Migrate oss to reqsign core v2 (#7165)
54acd326c is described below
commit 54acd326c3c0771db01d5ecab3c186e3300831af
Author: Xuanwo <[email protected]>
AuthorDate: Wed Jan 28 18:40:55 2026 +0800
refactor: Migrate oss to reqsign core v2 (#7165)
* refactor: Migrate oss to reqsign core v2
* Make clippy happy
---
core/Cargo.lock | 26 ++++++++-
core/DEPENDENCIES.rust.tsv | 3 +-
core/services/oss/Cargo.toml | 8 +--
core/services/oss/src/backend.rs | 94 +++++++++++++++++++------------
core/services/oss/src/core.rs | 118 ++++++++++++++++-----------------------
core/services/oss/src/writer.rs | 10 ++--
6 files changed, 140 insertions(+), 119 deletions(-)
diff --git a/core/Cargo.lock b/core/Cargo.lock
index 9c18a7c86..d93459540 100644
--- a/core/Cargo.lock
+++ b/core/Cargo.lock
@@ -6879,7 +6879,10 @@ dependencies = [
"opendal-core",
"pretty_assertions",
"quick-xml",
- "reqsign",
+ "reqsign-aliyun-oss",
+ "reqsign-core",
+ "reqsign-file-read-tokio",
+ "reqsign-http-send-reqwest",
"serde",
"tokio",
]
@@ -8645,6 +8648,23 @@ dependencies = [
"sha2",
]
+[[package]]
+name = "reqsign-aliyun-oss"
+version = "2.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ca53e7f1b5767da409d861e864d6c2867e710a500103582196bd6fff8f103d32"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "form_urlencoded",
+ "http 1.4.0",
+ "log",
+ "percent-encoding",
+ "reqsign-core",
+ "serde",
+ "serde_json",
+]
+
[[package]]
name = "reqsign-aws-v4"
version = "2.0.1"
@@ -8669,9 +8689,9 @@ dependencies = [
[[package]]
name = "reqsign-core"
-version = "2.0.1"
+version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "39da118ccf3bdb067ac6cc40136fec99bc5ba418cbd388dc88e4ce0e5d0b1423"
+checksum = "9ba66eb941c0f723269a394baf3b19a2fa697a1e593f3e902779df6c35d24e21"
dependencies = [
"anyhow",
"async-trait",
diff --git a/core/DEPENDENCIES.rust.tsv b/core/DEPENDENCIES.rust.tsv
index 38c5b4f61..e91cf64bb 100644
--- a/core/DEPENDENCIES.rust.tsv
+++ b/core/DEPENDENCIES.rust.tsv
@@ -99,8 +99,9 @@ [email protected] X
X
[email protected] X
X
[email protected] X
X
[email protected] X
[email protected] X
[email protected] X
[email protected] X
[email protected] X
[email protected] X
[email protected] X
[email protected] X
X
diff --git a/core/services/oss/Cargo.toml b/core/services/oss/Cargo.toml
index 47eff2941..22ebd0192 100644
--- a/core/services/oss/Cargo.toml
+++ b/core/services/oss/Cargo.toml
@@ -36,10 +36,10 @@ http = { workspace = true }
log = { workspace = true }
opendal-core = { path = "../../core", version = "0.55.0", default-features =
false }
quick-xml = { workspace = true, features = ["serialize", "overlapped-lists"] }
-reqsign = { workspace = true, features = [
- "services-aliyun",
- "reqwest_request",
-] }
+reqsign-aliyun-oss = { version = "2.0.2", default-features = false }
+reqsign-core = { version = "2.0.1", default-features = false }
+reqsign-file-read-tokio = { version = "2.0.1", default-features = false }
+reqsign-http-send-reqwest = { version = "2.0.1", default-features = false }
serde = { workspace = true, features = ["derive"] }
[dev-dependencies]
diff --git a/core/services/oss/src/backend.rs b/core/services/oss/src/backend.rs
index 0b79c196f..c152872d7 100644
--- a/core/services/oss/src/backend.rs
+++ b/core/services/oss/src/backend.rs
@@ -22,9 +22,18 @@ use http::Response;
use http::StatusCode;
use http::Uri;
use log::debug;
-use reqsign::AliyunConfig;
-use reqsign::AliyunLoader;
-use reqsign::AliyunOssSigner;
+use reqsign_aliyun_oss::AssumeRoleWithOidcCredentialProvider;
+use reqsign_aliyun_oss::EnvCredentialProvider;
+use reqsign_aliyun_oss::RequestSigner;
+use reqsign_aliyun_oss::StaticCredentialProvider;
+use reqsign_core::Context;
+use reqsign_core::Env as _;
+use reqsign_core::OsEnv;
+use reqsign_core::ProvideCredentialChain;
+use reqsign_core::Signer;
+use reqsign_core::StaticEnv;
+use reqsign_file_read_tokio::TokioFileRead;
+use reqsign_http_send_reqwest::ReqwestHttpSend;
use super::OSS_SCHEME;
use super::config::OssConfig;
@@ -431,46 +440,63 @@ impl Builder for OssBuilder {
),
};
- let mut cfg = AliyunConfig::default();
- // Load cfg from env first.
- cfg = cfg.from_env();
+ // NOTE: `AssumeRoleWithOidcCredentialProvider` still reads
`role_arn`, `oidc_provider_arn`
+ // and `oidc_token_file` from `Context` environment variables at
runtime. Until reqsign
+ // exposes typed builder APIs for all of them, we overlay config
values into a `StaticEnv`
+ // snapshot here.
+ let os_env = OsEnv;
+ let mut envs = os_env.vars();
- if let Some(v) = self.config.access_key_id {
- cfg.access_key_id = Some(v);
+ if let Some(v) = &self.config.role_arn {
+ envs.insert("ALIBABA_CLOUD_ROLE_ARN".to_string(), v.clone());
}
-
- if let Some(v) = self.config.access_key_secret {
- cfg.access_key_secret = Some(v);
- }
-
- if let Some(v) = self.config.security_token {
- cfg.security_token = Some(v);
+ if let Some(v) = &self.config.oidc_provider_arn {
+ envs.insert("ALIBABA_CLOUD_OIDC_PROVIDER_ARN".to_string(),
v.clone());
}
-
- if let Some(v) = self.config.role_arn {
- cfg.role_arn = Some(v);
+ if let Some(v) = &self.config.oidc_token_file {
+ envs.insert("ALIBABA_CLOUD_OIDC_TOKEN_FILE".to_string(),
v.clone());
}
- // override default role_session_name if set
- if let Some(v) = self.config.role_session_name {
- cfg.role_session_name = v;
- }
+ let mut assume_role = AssumeRoleWithOidcCredentialProvider::new();
- if let Some(v) = self.config.oidc_provider_arn {
- cfg.oidc_provider_arn = Some(v);
+ if let Some(sts_endpoint) = &self.config.sts_endpoint {
+ if sts_endpoint.starts_with("http://") ||
sts_endpoint.starts_with("https://") {
+ assume_role =
assume_role.with_sts_endpoint(sts_endpoint.clone());
+ } else {
+ envs.insert(
+ "ALIBABA_CLOUD_STS_ENDPOINT".to_string(),
+ sts_endpoint.clone(),
+ );
+ }
}
- if let Some(v) = self.config.oidc_token_file {
- cfg.oidc_token_file = Some(v);
+ if let Some(role_session_name) = &self.config.role_session_name {
+ assume_role =
assume_role.with_role_session_name(role_session_name.clone());
}
- if let Some(v) = self.config.sts_endpoint {
- cfg.sts_endpoint = Some(v);
+ let ctx = Context::new()
+ .with_file_read(TokioFileRead)
+
.with_http_send(ReqwestHttpSend::new(GLOBAL_REQWEST_CLIENT.clone()))
+ .with_env(StaticEnv {
+ home_dir: os_env.home_dir(),
+ envs,
+ });
+
+ let mut provider = ProvideCredentialChain::new()
+ .push(EnvCredentialProvider::new())
+ .push(assume_role);
+
+ if let (Some(ak), Some(sk)) = (&self.config.access_key_id,
&self.config.access_key_secret) {
+ let static_provider = if let Some(token) =
self.config.security_token.as_deref() {
+ StaticCredentialProvider::new(ak,
sk).with_security_token(token)
+ } else {
+ StaticCredentialProvider::new(ak, sk)
+ };
+ provider = provider.push_front(static_provider);
}
- let loader = AliyunLoader::new(GLOBAL_REQWEST_CLIENT.clone(), cfg);
-
- let signer = AliyunOssSigner::new(bucket);
+ let request_signer = RequestSigner::new(bucket);
+ let signer = Signer::new(ctx, provider, request_signer);
let delete_max_size = self
.config
@@ -554,7 +580,6 @@ impl Builder for OssBuilder {
presign_endpoint,
allow_anonymous: self.config.allow_anonymous,
signer,
- loader,
server_side_encryption,
server_side_encryption_key_id,
}),
@@ -689,9 +714,8 @@ impl Access for OssBackend {
"operation is not supported",
)),
};
- let mut req = req?;
-
- self.core.sign_query(&mut req, args.expire()).await?;
+ let req = req?;
+ let req = self.core.sign_query(req, args.expire()).await?;
// We don't need this request anymore, consume it directly.
let (parts, _) = req.into_parts();
diff --git a/core/services/oss/src/core.rs b/core/services/oss/src/core.rs
index 9df696380..b9f236d69 100644
--- a/core/services/oss/src/core.rs
+++ b/core/services/oss/src/core.rs
@@ -34,9 +34,8 @@ use http::header::IF_MODIFIED_SINCE;
use http::header::IF_NONE_MATCH;
use http::header::IF_UNMODIFIED_SINCE;
use http::header::RANGE;
-use reqsign::AliyunCredential;
-use reqsign::AliyunLoader;
-use reqsign::AliyunOssSigner;
+use reqsign_aliyun_oss::Credential;
+use reqsign_core::Signer;
use serde::Deserialize;
use serde::Serialize;
@@ -76,8 +75,7 @@ pub struct OssCore {
pub server_side_encryption: Option<HeaderValue>,
pub server_side_encryption_key_id: Option<HeaderValue>,
- pub loader: AliyunLoader,
- pub signer: AliyunOssSigner,
+ pub signer: Signer<Credential>,
}
impl Debug for OssCore {
@@ -92,48 +90,36 @@ impl Debug for OssCore {
}
impl OssCore {
- async fn load_credential(&self) -> Result<Option<AliyunCredential>> {
- let cred = self
- .loader
- .load()
- .await
- .map_err(new_request_credential_error)?;
-
- if let Some(cred) = cred {
- Ok(Some(cred))
- } else if self.allow_anonymous {
- // If allow_anonymous has been set, we will not sign the request.
- Ok(None)
- } else {
- // Mark this error as temporary since it could be caused by Aliyun
STS.
- Err(Error::new(
- ErrorKind::PermissionDenied,
- "no valid credential found, please check configuration or try
again",
- )
- .set_temporary())
+ pub async fn sign<T>(&self, req: Request<T>) -> Result<Request<T>> {
+ // Skip signing for anonymous access.
+ if self.allow_anonymous {
+ return Ok(req);
}
- }
- pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
- let cred = if let Some(cred) = self.load_credential().await? {
- cred
- } else {
- return Ok(());
- };
+ let (mut parts, body) = req.into_parts();
- self.signer.sign(req, &cred).map_err(new_request_sign_error)
+ self.signer
+ .sign(&mut parts, None)
+ .await
+ .map_err(|e| new_request_sign_error(e.into()))?;
+
+ Ok(Request::from_parts(parts, body))
}
- pub async fn sign_query<T>(&self, req: &mut Request<T>, duration:
Duration) -> Result<()> {
- let cred = if let Some(cred) = self.load_credential().await? {
- cred
- } else {
- return Ok(());
- };
+ pub async fn sign_query<T>(&self, req: Request<T>, duration: Duration) ->
Result<Request<T>> {
+ // Skip signing for anonymous access.
+ if self.allow_anonymous {
+ return Ok(req);
+ }
+
+ let (mut parts, body) = req.into_parts();
self.signer
- .sign_query(req, duration, &cred)
- .map_err(new_request_sign_error)
+ .sign(&mut parts, Some(duration))
+ .await
+ .map_err(|e| new_request_sign_error(e.into()))?;
+
+ Ok(Request::from_parts(parts, body))
}
#[inline]
@@ -475,15 +461,14 @@ impl OssCore {
}
pub async fn oss_get_object(&self, path: &str, args: &OpRead) ->
Result<Response<HttpBody>> {
- let mut req = self.oss_get_object_request(path, false, args)?;
- self.sign(&mut req).await?;
+ let req = self.oss_get_object_request(path, false, args)?;
+ let req = self.sign(req).await?;
self.info.http_client().fetch(req).await
}
pub async fn oss_head_object(&self, path: &str, args: &OpStat) ->
Result<Response<Buffer>> {
- let mut req = self.oss_head_object_request(path, false, args)?;
-
- self.sign(&mut req).await?;
+ let req = self.oss_head_object_request(path, false, args)?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -506,9 +491,8 @@ impl OssCore {
let req = req.extension(Operation::Copy);
- let mut req =
req.body(Buffer::new()).map_err(new_request_build_error)?;
-
- self.sign(&mut req).await?;
+ let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -520,9 +504,8 @@ impl OssCore {
limit: Option<usize>,
start_after: Option<String>,
) -> Result<Response<Buffer>> {
- let mut req = self.oss_list_object_request(path, token, delimiter,
limit, start_after)?;
-
- self.sign(&mut req).await?;
+ let req = self.oss_list_object_request(path, token, delimiter, limit,
start_after)?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -556,19 +539,17 @@ impl OssCore {
url = url.push("version-id-marker",
&percent_encode_path(version_id_marker));
}
- let mut req = Request::get(url.finish())
+ let req = Request::get(url.finish())
.extension(Operation::List)
.body(Buffer::new())
.map_err(new_request_build_error)?;
-
- self.sign(&mut req).await?;
-
+ let req = self.sign(req).await?;
self.send(req).await
}
pub async fn oss_delete_object(&self, path: &str, args: &OpDelete) ->
Result<Response<Buffer>> {
- let mut req = self.oss_delete_object_request(path, args)?;
- self.sign(&mut req).await?;
+ let req = self.oss_delete_object_request(path, args)?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -600,12 +581,10 @@ impl OssCore {
let req = req.extension(Operation::Delete);
- let mut req = req
+ let req = req
.body(Buffer::from(Bytes::from(content)))
.map_err(new_request_build_error)?;
-
- self.sign(&mut req).await?;
-
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -642,8 +621,8 @@ impl OssCore {
let req = req.extension(Operation::Write);
- let mut req =
req.body(Buffer::new()).map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -673,8 +652,8 @@ impl OssCore {
let req = req.extension(Operation::Write);
- let mut req = req.body(body).map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = req.body(body).map_err(new_request_build_error)?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -707,11 +686,10 @@ impl OssCore {
let req = req.extension(Operation::Write);
- let mut req = req
+ let req = req
.body(Buffer::from(Bytes::from(content)))
.map_err(new_request_build_error)?;
-
- self.sign(&mut req).await?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -731,11 +709,11 @@ impl OssCore {
percent_encode_path(upload_id)
);
- let mut req = Request::delete(&url)
+ let req = Request::delete(&url)
.extension(Operation::Write)
.body(Buffer::new())
.map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = self.sign(req).await?;
self.send(req).await
}
}
diff --git a/core/services/oss/src/writer.rs b/core/services/oss/src/writer.rs
index 6668f3386..7a71a9a08 100644
--- a/core/services/oss/src/writer.rs
+++ b/core/services/oss/src/writer.rs
@@ -62,11 +62,10 @@ impl OssWriter {
impl oio::MultipartWrite for OssWriter {
async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
- let mut req =
+ let req =
self.core
.oss_put_object_request(&self.path, Some(size), &self.op,
body, false)?;
-
- self.core.sign(&mut req).await?;
+ let req = self.core.sign(req).await?;
let resp = self.core.send(req).await?;
@@ -209,11 +208,10 @@ impl oio::AppendWrite for OssWriter {
}
async fn append(&self, offset: u64, size: u64, body: Buffer) ->
Result<Metadata> {
- let mut req = self
+ let req = self
.core
.oss_append_object_request(&self.path, offset, size, &self.op,
body)?;
-
- self.core.sign(&mut req).await?;
+ let req = self.core.sign(req).await?;
let resp = self.core.send(req).await?;