This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 54acd326c refactor: Migrate oss to reqsign core v2 (#7165)
54acd326c is described below

commit 54acd326c3c0771db01d5ecab3c186e3300831af
Author: Xuanwo <[email protected]>
AuthorDate: Wed Jan 28 18:40:55 2026 +0800

    refactor: Migrate oss to reqsign core v2 (#7165)
    
    * refactor: Migrate oss to reqsign core v2
    
    * Make clippy happy
---
 core/Cargo.lock                  |  26 ++++++++-
 core/DEPENDENCIES.rust.tsv       |   3 +-
 core/services/oss/Cargo.toml     |   8 +--
 core/services/oss/src/backend.rs |  94 +++++++++++++++++++------------
 core/services/oss/src/core.rs    | 118 ++++++++++++++++-----------------------
 core/services/oss/src/writer.rs  |  10 ++--
 6 files changed, 140 insertions(+), 119 deletions(-)

diff --git a/core/Cargo.lock b/core/Cargo.lock
index 9c18a7c86..d93459540 100644
--- a/core/Cargo.lock
+++ b/core/Cargo.lock
@@ -6879,7 +6879,10 @@ dependencies = [
  "opendal-core",
  "pretty_assertions",
  "quick-xml",
- "reqsign",
+ "reqsign-aliyun-oss",
+ "reqsign-core",
+ "reqsign-file-read-tokio",
+ "reqsign-http-send-reqwest",
  "serde",
  "tokio",
 ]
@@ -8645,6 +8648,23 @@ dependencies = [
  "sha2",
 ]
 
+[[package]]
+name = "reqsign-aliyun-oss"
+version = "2.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "ca53e7f1b5767da409d861e864d6c2867e710a500103582196bd6fff8f103d32"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "form_urlencoded",
+ "http 1.4.0",
+ "log",
+ "percent-encoding",
+ "reqsign-core",
+ "serde",
+ "serde_json",
+]
+
 [[package]]
 name = "reqsign-aws-v4"
 version = "2.0.1"
@@ -8669,9 +8689,9 @@ dependencies = [
 
 [[package]]
 name = "reqsign-core"
-version = "2.0.1"
+version = "2.0.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "39da118ccf3bdb067ac6cc40136fec99bc5ba418cbd388dc88e4ce0e5d0b1423"
+checksum = "9ba66eb941c0f723269a394baf3b19a2fa697a1e593f3e902779df6c35d24e21"
 dependencies = [
  "anyhow",
  "async-trait",
diff --git a/core/DEPENDENCIES.rust.tsv b/core/DEPENDENCIES.rust.tsv
index 38c5b4f61..e91cf64bb 100644
--- a/core/DEPENDENCIES.rust.tsv
+++ b/core/DEPENDENCIES.rust.tsv
@@ -99,8 +99,9 @@ [email protected]    X                                               
                        X
 [email protected]      X                                                       
                X               
 [email protected]        X                                                       
                X               
 [email protected] X                                                               
                        
[email protected]       X                                               
                                        
 [email protected]   X                                                       
                                
[email protected]     X                                                       
                                
[email protected]     X                                                       
                                
 [email protected]  X                                               
                                        
 [email protected]        X                                       
                                                
 [email protected]        X                                                       
                X               
diff --git a/core/services/oss/Cargo.toml b/core/services/oss/Cargo.toml
index 47eff2941..22ebd0192 100644
--- a/core/services/oss/Cargo.toml
+++ b/core/services/oss/Cargo.toml
@@ -36,10 +36,10 @@ http = { workspace = true }
 log = { workspace = true }
 opendal-core = { path = "../../core", version = "0.55.0", default-features = 
false }
 quick-xml = { workspace = true, features = ["serialize", "overlapped-lists"] }
-reqsign = { workspace = true, features = [
-  "services-aliyun",
-  "reqwest_request",
-] }
+reqsign-aliyun-oss = { version = "2.0.2", default-features = false }
+reqsign-core = { version = "2.0.1", default-features = false }
+reqsign-file-read-tokio = { version = "2.0.1", default-features = false }
+reqsign-http-send-reqwest = { version = "2.0.1", default-features = false }
 serde = { workspace = true, features = ["derive"] }
 
 [dev-dependencies]
diff --git a/core/services/oss/src/backend.rs b/core/services/oss/src/backend.rs
index 0b79c196f..c152872d7 100644
--- a/core/services/oss/src/backend.rs
+++ b/core/services/oss/src/backend.rs
@@ -22,9 +22,18 @@ use http::Response;
 use http::StatusCode;
 use http::Uri;
 use log::debug;
-use reqsign::AliyunConfig;
-use reqsign::AliyunLoader;
-use reqsign::AliyunOssSigner;
+use reqsign_aliyun_oss::AssumeRoleWithOidcCredentialProvider;
+use reqsign_aliyun_oss::EnvCredentialProvider;
+use reqsign_aliyun_oss::RequestSigner;
+use reqsign_aliyun_oss::StaticCredentialProvider;
+use reqsign_core::Context;
+use reqsign_core::Env as _;
+use reqsign_core::OsEnv;
+use reqsign_core::ProvideCredentialChain;
+use reqsign_core::Signer;
+use reqsign_core::StaticEnv;
+use reqsign_file_read_tokio::TokioFileRead;
+use reqsign_http_send_reqwest::ReqwestHttpSend;
 
 use super::OSS_SCHEME;
 use super::config::OssConfig;
@@ -431,46 +440,63 @@ impl Builder for OssBuilder {
             ),
         };
 
-        let mut cfg = AliyunConfig::default();
-        // Load cfg from env first.
-        cfg = cfg.from_env();
+        // NOTE: `AssumeRoleWithOidcCredentialProvider` still reads 
`role_arn`, `oidc_provider_arn`
+        // and `oidc_token_file` from `Context` environment variables at 
runtime. Until reqsign
+        // exposes typed builder APIs for all of them, we overlay config 
values into a `StaticEnv`
+        // snapshot here.
+        let os_env = OsEnv;
+        let mut envs = os_env.vars();
 
-        if let Some(v) = self.config.access_key_id {
-            cfg.access_key_id = Some(v);
+        if let Some(v) = &self.config.role_arn {
+            envs.insert("ALIBABA_CLOUD_ROLE_ARN".to_string(), v.clone());
         }
-
-        if let Some(v) = self.config.access_key_secret {
-            cfg.access_key_secret = Some(v);
-        }
-
-        if let Some(v) = self.config.security_token {
-            cfg.security_token = Some(v);
+        if let Some(v) = &self.config.oidc_provider_arn {
+            envs.insert("ALIBABA_CLOUD_OIDC_PROVIDER_ARN".to_string(), 
v.clone());
         }
-
-        if let Some(v) = self.config.role_arn {
-            cfg.role_arn = Some(v);
+        if let Some(v) = &self.config.oidc_token_file {
+            envs.insert("ALIBABA_CLOUD_OIDC_TOKEN_FILE".to_string(), 
v.clone());
         }
 
-        // override default role_session_name if set
-        if let Some(v) = self.config.role_session_name {
-            cfg.role_session_name = v;
-        }
+        let mut assume_role = AssumeRoleWithOidcCredentialProvider::new();
 
-        if let Some(v) = self.config.oidc_provider_arn {
-            cfg.oidc_provider_arn = Some(v);
+        if let Some(sts_endpoint) = &self.config.sts_endpoint {
+            if sts_endpoint.starts_with("http://";) || 
sts_endpoint.starts_with("https://";) {
+                assume_role = 
assume_role.with_sts_endpoint(sts_endpoint.clone());
+            } else {
+                envs.insert(
+                    "ALIBABA_CLOUD_STS_ENDPOINT".to_string(),
+                    sts_endpoint.clone(),
+                );
+            }
         }
 
-        if let Some(v) = self.config.oidc_token_file {
-            cfg.oidc_token_file = Some(v);
+        if let Some(role_session_name) = &self.config.role_session_name {
+            assume_role = 
assume_role.with_role_session_name(role_session_name.clone());
         }
 
-        if let Some(v) = self.config.sts_endpoint {
-            cfg.sts_endpoint = Some(v);
+        let ctx = Context::new()
+            .with_file_read(TokioFileRead)
+            
.with_http_send(ReqwestHttpSend::new(GLOBAL_REQWEST_CLIENT.clone()))
+            .with_env(StaticEnv {
+                home_dir: os_env.home_dir(),
+                envs,
+            });
+
+        let mut provider = ProvideCredentialChain::new()
+            .push(EnvCredentialProvider::new())
+            .push(assume_role);
+
+        if let (Some(ak), Some(sk)) = (&self.config.access_key_id, 
&self.config.access_key_secret) {
+            let static_provider = if let Some(token) = 
self.config.security_token.as_deref() {
+                StaticCredentialProvider::new(ak, 
sk).with_security_token(token)
+            } else {
+                StaticCredentialProvider::new(ak, sk)
+            };
+            provider = provider.push_front(static_provider);
         }
 
-        let loader = AliyunLoader::new(GLOBAL_REQWEST_CLIENT.clone(), cfg);
-
-        let signer = AliyunOssSigner::new(bucket);
+        let request_signer = RequestSigner::new(bucket);
+        let signer = Signer::new(ctx, provider, request_signer);
 
         let delete_max_size = self
             .config
@@ -554,7 +580,6 @@ impl Builder for OssBuilder {
                 presign_endpoint,
                 allow_anonymous: self.config.allow_anonymous,
                 signer,
-                loader,
                 server_side_encryption,
                 server_side_encryption_key_id,
             }),
@@ -689,9 +714,8 @@ impl Access for OssBackend {
                 "operation is not supported",
             )),
         };
-        let mut req = req?;
-
-        self.core.sign_query(&mut req, args.expire()).await?;
+        let req = req?;
+        let req = self.core.sign_query(req, args.expire()).await?;
 
         // We don't need this request anymore, consume it directly.
         let (parts, _) = req.into_parts();
diff --git a/core/services/oss/src/core.rs b/core/services/oss/src/core.rs
index 9df696380..b9f236d69 100644
--- a/core/services/oss/src/core.rs
+++ b/core/services/oss/src/core.rs
@@ -34,9 +34,8 @@ use http::header::IF_MODIFIED_SINCE;
 use http::header::IF_NONE_MATCH;
 use http::header::IF_UNMODIFIED_SINCE;
 use http::header::RANGE;
-use reqsign::AliyunCredential;
-use reqsign::AliyunLoader;
-use reqsign::AliyunOssSigner;
+use reqsign_aliyun_oss::Credential;
+use reqsign_core::Signer;
 use serde::Deserialize;
 use serde::Serialize;
 
@@ -76,8 +75,7 @@ pub struct OssCore {
     pub server_side_encryption: Option<HeaderValue>,
     pub server_side_encryption_key_id: Option<HeaderValue>,
 
-    pub loader: AliyunLoader,
-    pub signer: AliyunOssSigner,
+    pub signer: Signer<Credential>,
 }
 
 impl Debug for OssCore {
@@ -92,48 +90,36 @@ impl Debug for OssCore {
 }
 
 impl OssCore {
-    async fn load_credential(&self) -> Result<Option<AliyunCredential>> {
-        let cred = self
-            .loader
-            .load()
-            .await
-            .map_err(new_request_credential_error)?;
-
-        if let Some(cred) = cred {
-            Ok(Some(cred))
-        } else if self.allow_anonymous {
-            // If allow_anonymous has been set, we will not sign the request.
-            Ok(None)
-        } else {
-            // Mark this error as temporary since it could be caused by Aliyun 
STS.
-            Err(Error::new(
-                ErrorKind::PermissionDenied,
-                "no valid credential found, please check configuration or try 
again",
-            )
-            .set_temporary())
+    pub async fn sign<T>(&self, req: Request<T>) -> Result<Request<T>> {
+        // Skip signing for anonymous access.
+        if self.allow_anonymous {
+            return Ok(req);
         }
-    }
 
-    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
-        let cred = if let Some(cred) = self.load_credential().await? {
-            cred
-        } else {
-            return Ok(());
-        };
+        let (mut parts, body) = req.into_parts();
 
-        self.signer.sign(req, &cred).map_err(new_request_sign_error)
+        self.signer
+            .sign(&mut parts, None)
+            .await
+            .map_err(|e| new_request_sign_error(e.into()))?;
+
+        Ok(Request::from_parts(parts, body))
     }
 
-    pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: 
Duration) -> Result<()> {
-        let cred = if let Some(cred) = self.load_credential().await? {
-            cred
-        } else {
-            return Ok(());
-        };
+    pub async fn sign_query<T>(&self, req: Request<T>, duration: Duration) -> 
Result<Request<T>> {
+        // Skip signing for anonymous access.
+        if self.allow_anonymous {
+            return Ok(req);
+        }
+
+        let (mut parts, body) = req.into_parts();
 
         self.signer
-            .sign_query(req, duration, &cred)
-            .map_err(new_request_sign_error)
+            .sign(&mut parts, Some(duration))
+            .await
+            .map_err(|e| new_request_sign_error(e.into()))?;
+
+        Ok(Request::from_parts(parts, body))
     }
 
     #[inline]
@@ -475,15 +461,14 @@ impl OssCore {
     }
 
     pub async fn oss_get_object(&self, path: &str, args: &OpRead) -> 
Result<Response<HttpBody>> {
-        let mut req = self.oss_get_object_request(path, false, args)?;
-        self.sign(&mut req).await?;
+        let req = self.oss_get_object_request(path, false, args)?;
+        let req = self.sign(req).await?;
         self.info.http_client().fetch(req).await
     }
 
     pub async fn oss_head_object(&self, path: &str, args: &OpStat) -> 
Result<Response<Buffer>> {
-        let mut req = self.oss_head_object_request(path, false, args)?;
-
-        self.sign(&mut req).await?;
+        let req = self.oss_head_object_request(path, false, args)?;
+        let req = self.sign(req).await?;
         self.send(req).await
     }
 
@@ -506,9 +491,8 @@ impl OssCore {
 
         let req = req.extension(Operation::Copy);
 
-        let mut req = 
req.body(Buffer::new()).map_err(new_request_build_error)?;
-
-        self.sign(&mut req).await?;
+        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
+        let req = self.sign(req).await?;
         self.send(req).await
     }
 
@@ -520,9 +504,8 @@ impl OssCore {
         limit: Option<usize>,
         start_after: Option<String>,
     ) -> Result<Response<Buffer>> {
-        let mut req = self.oss_list_object_request(path, token, delimiter, 
limit, start_after)?;
-
-        self.sign(&mut req).await?;
+        let req = self.oss_list_object_request(path, token, delimiter, limit, 
start_after)?;
+        let req = self.sign(req).await?;
         self.send(req).await
     }
 
@@ -556,19 +539,17 @@ impl OssCore {
             url = url.push("version-id-marker", 
&percent_encode_path(version_id_marker));
         }
 
-        let mut req = Request::get(url.finish())
+        let req = Request::get(url.finish())
             .extension(Operation::List)
             .body(Buffer::new())
             .map_err(new_request_build_error)?;
-
-        self.sign(&mut req).await?;
-
+        let req = self.sign(req).await?;
         self.send(req).await
     }
 
     pub async fn oss_delete_object(&self, path: &str, args: &OpDelete) -> 
Result<Response<Buffer>> {
-        let mut req = self.oss_delete_object_request(path, args)?;
-        self.sign(&mut req).await?;
+        let req = self.oss_delete_object_request(path, args)?;
+        let req = self.sign(req).await?;
         self.send(req).await
     }
 
@@ -600,12 +581,10 @@ impl OssCore {
 
         let req = req.extension(Operation::Delete);
 
-        let mut req = req
+        let req = req
             .body(Buffer::from(Bytes::from(content)))
             .map_err(new_request_build_error)?;
-
-        self.sign(&mut req).await?;
-
+        let req = self.sign(req).await?;
         self.send(req).await
     }
 
@@ -642,8 +621,8 @@ impl OssCore {
 
         let req = req.extension(Operation::Write);
 
-        let mut req = 
req.body(Buffer::new()).map_err(new_request_build_error)?;
-        self.sign(&mut req).await?;
+        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
+        let req = self.sign(req).await?;
         self.send(req).await
     }
 
@@ -673,8 +652,8 @@ impl OssCore {
 
         let req = req.extension(Operation::Write);
 
-        let mut req = req.body(body).map_err(new_request_build_error)?;
-        self.sign(&mut req).await?;
+        let req = req.body(body).map_err(new_request_build_error)?;
+        let req = self.sign(req).await?;
         self.send(req).await
     }
 
@@ -707,11 +686,10 @@ impl OssCore {
 
         let req = req.extension(Operation::Write);
 
-        let mut req = req
+        let req = req
             .body(Buffer::from(Bytes::from(content)))
             .map_err(new_request_build_error)?;
-
-        self.sign(&mut req).await?;
+        let req = self.sign(req).await?;
         self.send(req).await
     }
 
@@ -731,11 +709,11 @@ impl OssCore {
             percent_encode_path(upload_id)
         );
 
-        let mut req = Request::delete(&url)
+        let req = Request::delete(&url)
             .extension(Operation::Write)
             .body(Buffer::new())
             .map_err(new_request_build_error)?;
-        self.sign(&mut req).await?;
+        let req = self.sign(req).await?;
         self.send(req).await
     }
 }
diff --git a/core/services/oss/src/writer.rs b/core/services/oss/src/writer.rs
index 6668f3386..7a71a9a08 100644
--- a/core/services/oss/src/writer.rs
+++ b/core/services/oss/src/writer.rs
@@ -62,11 +62,10 @@ impl OssWriter {
 
 impl oio::MultipartWrite for OssWriter {
     async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
-        let mut req =
+        let req =
             self.core
                 .oss_put_object_request(&self.path, Some(size), &self.op, 
body, false)?;
-
-        self.core.sign(&mut req).await?;
+        let req = self.core.sign(req).await?;
 
         let resp = self.core.send(req).await?;
 
@@ -209,11 +208,10 @@ impl oio::AppendWrite for OssWriter {
     }
 
     async fn append(&self, offset: u64, size: u64, body: Buffer) -> 
Result<Metadata> {
-        let mut req = self
+        let req = self
             .core
             .oss_append_object_request(&self.path, offset, size, &self.op, 
body)?;
-
-        self.core.sign(&mut req).await?;
+        let req = self.core.sign(req).await?;
 
         let resp = self.core.send(req).await?;
 

Reply via email to