This is an automated email from the ASF dual-hosted git repository. koushiro pushed a commit to branch migrate-obs-to-reqsign-core-v2 in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 057cda6abc84a21d0e71be201894c634d31b5dd1 Author: koushiro <[email protected]> AuthorDate: Fri Jan 30 19:56:55 2026 +0800 refactor(services/obs): migrate obs to reqsign-core v2 --- core/Cargo.lock | 24 +++++++++-- core/services/obs/Cargo.toml | 8 ++-- core/services/obs/src/backend.rs | 42 ++++++++++-------- core/services/obs/src/core.rs | 93 ++++++++++++++++------------------------ core/services/obs/src/writer.rs | 23 +++++----- 5 files changed, 98 insertions(+), 92 deletions(-) diff --git a/core/Cargo.lock b/core/Cargo.lock index d93459540..1ec3ebeea 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -6838,7 +6838,10 @@ dependencies = [ "log", "opendal-core", "quick-xml", - "reqsign", + "reqsign-core", + "reqsign-file-read-tokio", + "reqsign-http-send-reqwest", + "reqsign-huaweicloud-obs", "serde", "tokio", ] @@ -8637,7 +8640,6 @@ dependencies = [ "http 1.4.0", "jsonwebtoken", "log", - "once_cell", "percent-encoding", "rand 0.8.5", "reqwest", @@ -8711,9 +8713,9 @@ dependencies = [ [[package]] name = "reqsign-file-read-tokio" -version = "2.0.1" +version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "669ea66036266a9ac371d2e63cc7d345e69994da0168b4e6f3487fe21e126f76" +checksum = "702f12a867bf8e507de907fa0f4d75b96469ace7edd33fcc1fc8a8ef58f3c8d2" dependencies = [ "anyhow", "async-trait", @@ -8738,6 +8740,20 @@ dependencies = [ "wasm-bindgen-futures", ] +[[package]] +name = "reqsign-huaweicloud-obs" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ef663039ba605fb73ca2837215ed082fc264fd815395c72fa93b31c46a32081" +dependencies = [ + "anyhow", + "async-trait", + "http 1.4.0", + "log", + "percent-encoding", + "reqsign-core", +] + [[package]] name = "reqwest" version = "0.12.24" diff --git a/core/services/obs/Cargo.toml b/core/services/obs/Cargo.toml index 3bd2582c0..3d828df60 100644 --- a/core/services/obs/Cargo.toml +++ b/core/services/obs/Cargo.toml @@ -36,10 +36,10 @@ http = { workspace = true } log = { workspace = true } opendal-core = { path = "../../core", version = "0.55.0", default-features = false } quick-xml = { workspace = true, features = ["serialize", "overlapped-lists"] } -reqsign = { workspace = true, features = [ - "services-huaweicloud", - "reqwest_request", -] } +reqsign-core = "2.0.2" +reqsign-file-read-tokio = "2.0.2" +reqsign-http-send-reqwest = "2.0.1" +reqsign-huaweicloud-obs = "2.0.2" serde = { workspace = true, features = ["derive"] } [dev-dependencies] diff --git a/core/services/obs/src/backend.rs b/core/services/obs/src/backend.rs index a4aef9b4f..42bcbe2ba 100644 --- a/core/services/obs/src/backend.rs +++ b/core/services/obs/src/backend.rs @@ -23,9 +23,17 @@ use http::Response; use http::StatusCode; use http::Uri; use log::debug; -use reqsign::HuaweicloudObsConfig; -use reqsign::HuaweicloudObsCredentialLoader; -use reqsign::HuaweicloudObsSigner; +use opendal_core::raw::*; +use opendal_core::*; +use reqsign_core::Context; +use reqsign_core::OsEnv; +use reqsign_core::ProvideCredentialChain; +use reqsign_core::Signer; +use reqsign_file_read_tokio::TokioFileRead; +use reqsign_http_send_reqwest::ReqwestHttpSend; +use reqsign_huaweicloud_obs::EnvCredentialProvider; +use reqsign_huaweicloud_obs::RequestSigner; +use reqsign_huaweicloud_obs::StaticCredentialProvider; use super::OBS_SCHEME; use super::config::ObsConfig; @@ -36,8 +44,6 @@ use super::error::parse_error; use super::lister::ObsLister; use super::writer::ObsWriter; use super::writer::ObsWriters; -use opendal_core::raw::*; -use opendal_core::*; /// Huawei-Cloud Object Storage Service (OBS) support #[doc = include_str!("docs.md")] @@ -169,20 +175,18 @@ impl Builder for ObsBuilder { }; debug!("backend use endpoint {}", &endpoint); - let mut cfg = HuaweicloudObsConfig::default(); - // Load cfg from env first. - cfg = cfg.from_env(); + let ctx = Context::new() + .with_file_read(TokioFileRead) + .with_http_send(ReqwestHttpSend::new(GLOBAL_REQWEST_CLIENT.clone())) + .with_env(OsEnv); - if let Some(v) = self.config.access_key_id { - cfg.access_key_id = Some(v); - } + let mut provider = ProvideCredentialChain::new().push(EnvCredentialProvider::new()); - if let Some(v) = self.config.secret_access_key { - cfg.secret_access_key = Some(v); + if let (Some(ak), Some(sk)) = (&self.config.access_key_id, &self.config.secret_access_key) { + let static_provider = StaticCredentialProvider::new(ak, sk); + provider = provider.push_front(static_provider); } - let loader = HuaweicloudObsCredentialLoader::new(cfg); - // Set the bucket name in CanonicalizedResource. // 1. If the bucket is bound to a user domain name, use the user domain name as the bucket name, // for example, `/obs.ccc.com/object`. `obs.ccc.com` is the user domain name bound to the bucket. @@ -190,7 +194,8 @@ impl Builder for ObsBuilder { // // Please refer to this doc for more details: // https://support.huaweicloud.com/intl/en-us/api-obs/obs_04_0010.html - let signer = HuaweicloudObsSigner::new(if is_obs_default { &bucket } else { &endpoint }); + let request_signer = RequestSigner::new(if is_obs_default { &bucket } else { &endpoint }); + let signer = Signer::new(ctx, provider, request_signer); debug!("backend build finished"); Ok(ObsBackend { @@ -252,7 +257,6 @@ impl Builder for ObsBuilder { root, endpoint: format!("{}://{}", &scheme, &endpoint), signer, - loader, }), }) } @@ -390,8 +394,8 @@ impl Access for ObsBackend { "operation is not supported", )), }; - let mut req = req?; - self.core.sign_query(&mut req, args.expire()).await?; + let req = req?; + let req = self.core.sign_query(req, args.expire()).await?; // We don't need this request anymore, consume it directly. let (parts, _) = req.into_parts(); diff --git a/core/services/obs/src/core.rs b/core/services/obs/src/core.rs index 2b754ff70..722ef0054 100644 --- a/core/services/obs/src/core.rs +++ b/core/services/obs/src/core.rs @@ -27,14 +27,12 @@ use http::header::CONTENT_LENGTH; use http::header::CONTENT_TYPE; use http::header::IF_MATCH; use http::header::IF_NONE_MATCH; -use reqsign::HuaweicloudObsCredential; -use reqsign::HuaweicloudObsCredentialLoader; -use reqsign::HuaweicloudObsSigner; -use serde::Deserialize; -use serde::Serialize; - use opendal_core::raw::*; use opendal_core::*; +use reqsign_core::Signer; +use reqsign_huaweicloud_obs::Credential; +use serde::Deserialize; +use serde::Serialize; pub mod constants { pub const X_OBS_META_PREFIX: &str = "x-obs-meta-"; @@ -47,8 +45,7 @@ pub struct ObsCore { pub root: String, pub endpoint: String, - pub signer: HuaweicloudObsSigner, - pub loader: HuaweicloudObsCredentialLoader, + pub signer: Signer<Credential>, } impl Debug for ObsCore { @@ -62,40 +59,26 @@ impl Debug for ObsCore { } impl ObsCore { - async fn load_credential(&self) -> Result<Option<HuaweicloudObsCredential>> { - let cred = self - .loader - .load() - .await - .map_err(new_request_credential_error)?; - - if let Some(cred) = cred { - Ok(Some(cred)) - } else { - Ok(None) - } - } + pub async fn sign<T>(&self, req: Request<T>) -> Result<Request<T>> { + let (mut parts, body) = req.into_parts(); - pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> { - let cred = if let Some(cred) = self.load_credential().await? { - cred - } else { - return Ok(()); - }; + self.signer + .sign(&mut parts, None) + .await + .map_err(|e| new_request_sign_error(e.into()))?; - self.signer.sign(req, &cred).map_err(new_request_sign_error) + Ok(Request::from_parts(parts, body)) } - pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: Duration) -> Result<()> { - let cred = if let Some(cred) = self.load_credential().await? { - cred - } else { - return Ok(()); - }; + pub async fn sign_query<T>(&self, req: Request<T>, duration: Duration) -> Result<Request<T>> { + let (mut parts, body) = req.into_parts(); self.signer - .sign_query(req, duration, &cred) - .map_err(new_request_sign_error) + .sign(&mut parts, Some(duration)) + .await + .map_err(|e| new_request_sign_error(e.into()))?; + + Ok(Request::from_parts(parts, body)) } #[inline] @@ -111,9 +94,9 @@ impl ObsCore { range: BytesRange, args: &OpRead, ) -> Result<Response<HttpBody>> { - let mut req = self.obs_get_object_request(path, range, args)?; + let req = self.obs_get_object_request(path, range, args)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.info.http_client().fetch(req).await } @@ -190,9 +173,9 @@ impl ObsCore { } pub async fn obs_head_object(&self, path: &str, args: &OpStat) -> Result<Response<Buffer>> { - let mut req = self.obs_head_object_request(path, args)?; + let req = self.obs_head_object_request(path, args)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.send(req).await } @@ -230,12 +213,12 @@ impl ObsCore { let req = Request::delete(&url); - let mut req = req + let req = req .extension(Operation::Delete) .body(Buffer::new()) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.send(req).await } @@ -287,13 +270,13 @@ impl ObsCore { let source = format!("/{}/{}", self.bucket, percent_encode_path(&source)); let url = format!("{}/{}", self.endpoint, percent_encode_path(&target)); - let mut req = Request::put(&url) + let req = Request::put(&url) .extension(Operation::Copy) .header("x-obs-copy-source", &source) .body(Buffer::new()) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.send(req).await } @@ -321,12 +304,12 @@ impl ObsCore { url = url.push("marker", next_marker); } - let mut req = Request::get(url.finish()) + let req = Request::get(url.finish()) .extension(Operation::List) .body(Buffer::new()) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.send(req).await } @@ -344,12 +327,12 @@ impl ObsCore { req = req.header(CONTENT_TYPE, mime) } - let mut req = req + let req = req .extension(Operation::Write) .body(Buffer::new()) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.send(req).await } @@ -377,13 +360,13 @@ impl ObsCore { req = req.header(CONTENT_LENGTH, size); } - let mut req = req + let req = req .extension(Operation::Write) // Set body .body(body) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.send(req).await } @@ -413,12 +396,12 @@ impl ObsCore { // Set content-type to `application/xml` to avoid mixed with form post. let req = req.header(CONTENT_TYPE, "application/xml"); - let mut req = req + let req = req .extension(Operation::Write) .body(Buffer::from(Bytes::from(content))) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.send(req).await } @@ -437,12 +420,12 @@ impl ObsCore { percent_encode_path(upload_id) ); - let mut req = Request::delete(&url) + let req = Request::delete(&url) .extension(Operation::Write) .body(Buffer::new()) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.send(req).await } } @@ -500,7 +483,7 @@ pub struct CompleteMultipartUploadRequestPart { /// Output of `CompleteMultipartUpload` operation #[derive(Debug, Default, Deserialize)] -#[serde[default, rename_all = "PascalCase"]] +#[serde(default, rename_all = "PascalCase")] pub struct CompleteMultipartUploadResult { pub location: String, pub bucket: String, diff --git a/core/services/obs/src/writer.rs b/core/services/obs/src/writer.rs index 6deaf6462..1f12fd8a4 100644 --- a/core/services/obs/src/writer.rs +++ b/core/services/obs/src/writer.rs @@ -21,12 +21,11 @@ use bytes::Buf; use http::HeaderMap; use http::HeaderValue; use http::StatusCode; +use opendal_core::raw::*; +use opendal_core::*; use super::core::*; use super::error::parse_error; -use opendal_core::raw::oio::MultipartPart; -use opendal_core::raw::*; -use opendal_core::*; pub type ObsWriters = TwoWays<oio::MultipartWriter<ObsWriter>, oio::AppendWriter<ObsWriter>>; @@ -64,11 +63,11 @@ impl ObsWriter { impl oio::MultipartWrite for ObsWriter { async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> { - let mut req = self + let req = self .core .obs_put_object_request(&self.path, Some(size), &self.op, body)?; - self.core.sign(&mut req).await?; + let req = self.core.sign(req).await?; let resp = self.core.send(req).await?; @@ -110,7 +109,7 @@ impl oio::MultipartWrite for ObsWriter { part_number: usize, size: u64, body: Buffer, - ) -> Result<MultipartPart> { + ) -> Result<oio::MultipartPart> { // Obs service requires part number must between [1..=10000] let part_number = part_number + 1; @@ -132,7 +131,7 @@ impl oio::MultipartWrite for ObsWriter { })? .to_string(); - Ok(MultipartPart { + Ok(oio::MultipartPart { part_number, etag, checksum: None, @@ -142,7 +141,11 @@ impl oio::MultipartWrite for ObsWriter { } } - async fn complete_part(&self, upload_id: &str, parts: &[MultipartPart]) -> Result<Metadata> { + async fn complete_part( + &self, + upload_id: &str, + parts: &[oio::MultipartPart], + ) -> Result<Metadata> { let parts = parts .iter() .map(|p| CompleteMultipartUploadRequestPart { @@ -209,11 +212,11 @@ impl oio::AppendWrite for ObsWriter { } async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<Metadata> { - let mut req = self + let req = self .core .obs_append_object_request(&self.path, offset, size, &self.op, body)?; - self.core.sign(&mut req).await?; + let req = self.core.sign(req).await?; let resp = self.core.send(req).await?;
