This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch 6560-reqsign-core-v1-s3
in repository https://gitbox.apache.org/repos/asf/opendal.git

commit 7b9dab80f528542d8c8cb28002ea7e13cde0557e
Author: Xuanwo <[email protected]>
AuthorDate: Mon Oct 13 13:03:32 2025 +0800

    Save work
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/Cargo.lock                 | 111 +++++++++++++----
 core/Cargo.toml                 |  12 +-
 core/src/services/s3/backend.rs | 264 +++++++++++++++++-----------------------
 core/src/services/s3/core.rs    | 160 ++++++++++--------------
 core/src/services/s3/writer.rs  |  12 +-
 5 files changed, 272 insertions(+), 287 deletions(-)

diff --git a/core/Cargo.lock b/core/Cargo.lock
index 7d1d09700..c88d54cb6 100644
--- a/core/Cargo.lock
+++ b/core/Cargo.lock
@@ -4440,7 +4440,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "6a793df0d7afeac54f95b471d3af7f0d4fb975699f972341a4b76988d49cdf0c"
 dependencies = [
  "cfg-if",
- "windows-targets 0.53.0",
+ "windows-targets 0.53.3",
 ]
 
 [[package]]
@@ -5360,11 +5360,15 @@ dependencies = [
  "prometheus 0.14.0",
  "prometheus-client",
  "prost 0.13.5",
- "quick-xml 0.38.3",
+ "quick-xml",
  "rand 0.8.5",
  "redb",
  "redis",
  "reqsign",
+ "reqsign-aws-v4",
+ "reqsign-core",
+ "reqsign-file-read-tokio",
+ "reqsign-http-send-reqwest",
  "reqwest",
  "rocksdb",
  "rustls-native-certs 0.8.1",
@@ -6513,16 +6517,6 @@ dependencies = [
  "winapi",
 ]
 
-[[package]]
-name = "quick-xml"
-version = "0.37.5"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "331e97a1af0bf59823e6eadffe373d7b27f485be8748f71471c662c1f269b7fb"
-dependencies = [
- "memchr",
- "serde",
-]
-
 [[package]]
 name = "quick-xml"
 version = "0.38.3"
@@ -6941,18 +6935,86 @@ dependencies = [
  "log",
  "once_cell",
  "percent-encoding",
- "quick-xml 0.37.5",
  "rand 0.8.5",
  "reqwest",
  "rsa",
+ "serde",
+ "serde_json",
+ "sha1",
+ "sha2",
+]
+
+[[package]]
+name = "reqsign-aws-v4"
+version = "2.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "c50993dfb45a89b82dba66b2251984baad70e1b3c502db980f077f095615a26e"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "bytes",
+ "form_urlencoded",
+ "http 1.3.1",
+ "log",
+ "percent-encoding",
+ "quick-xml",
+ "reqsign-core",
  "rust-ini",
  "serde",
  "serde_json",
+ "serde_urlencoded",
+ "sha1",
+]
+
+[[package]]
+name = "reqsign-core"
+version = "2.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "8f2f07d63648c81c8dbccc19e8e10ef8d57daafb8174e4c2a75f14f33fe8c5ec"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "base64 0.22.1",
+ "bytes",
+ "form_urlencoded",
+ "hex",
+ "hmac",
+ "http 1.3.1",
+ "jiff",
+ "log",
+ "percent-encoding",
  "sha1",
  "sha2",
+ "windows-sys 0.61.2",
+]
+
+[[package]]
+name = "reqsign-file-read-tokio"
+version = "2.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "262eb485bb6e8213b13ef10e86ef8613539fb03daa2123b57d96675f784b15b6"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "reqsign-core",
  "tokio",
 ]
 
+[[package]]
+name = "reqsign-http-send-reqwest"
+version = "2.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "5ff9bb6507b23175dbda8a91ae1a0ad2317471f6ee117e500d1cf6b9ed1eeb0b"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "bytes",
+ "http 1.3.1",
+ "http-body-util",
+ "reqsign-core",
+ "reqwest",
+]
+
 [[package]]
 name = "reqwest"
 version = "0.12.23"
@@ -9702,7 +9764,7 @@ dependencies = [
  "windows-collections",
  "windows-core 0.61.2",
  "windows-future",
- "windows-link 0.1.1",
+ "windows-link 0.1.3",
  "windows-numerics",
 ]
 
@@ -9735,7 +9797,7 @@ checksum = 
"c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3"
 dependencies = [
  "windows-implement 0.60.0",
  "windows-interface 0.59.1",
- "windows-link 0.1.1",
+ "windows-link 0.1.3",
  "windows-result 0.3.4",
  "windows-strings",
 ]
@@ -9747,7 +9809,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e"
 dependencies = [
  "windows-core 0.61.2",
- "windows-link 0.1.1",
+ "windows-link 0.1.3",
  "windows-threading",
 ]
 
@@ -9797,9 +9859,9 @@ dependencies = [
 
 [[package]]
 name = "windows-link"
-version = "0.1.1"
+version = "0.1.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38"
+checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a"
 
 [[package]]
 name = "windows-link"
@@ -9814,7 +9876,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "9150af68066c4c5c07ddc0ce30421554771e528bde427614c61038bc2c92c2b1"
 dependencies = [
  "windows-core 0.61.2",
- "windows-link 0.1.1",
+ "windows-link 0.1.3",
 ]
 
 [[package]]
@@ -9832,7 +9894,7 @@ version = "0.3.4"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6"
 dependencies = [
- "windows-link 0.1.1",
+ "windows-link 0.1.3",
 ]
 
 [[package]]
@@ -9841,7 +9903,7 @@ version = "0.4.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57"
 dependencies = [
- "windows-link 0.1.1",
+ "windows-link 0.1.3",
 ]
 
 [[package]]
@@ -9913,10 +9975,11 @@ dependencies = [
 
 [[package]]
 name = "windows-targets"
-version = "0.53.0"
+version = "0.53.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "b1e4c7e8ceaaf9cb7d7507c974735728ab453b67ef8f18febdd7c11fe59dca8b"
+checksum = "d5fe6031c4041849d7c496a8ded650796e7b6ecc19df1a431c1a363342e5dc91"
 dependencies = [
+ "windows-link 0.1.3",
  "windows_aarch64_gnullvm 0.53.0",
  "windows_aarch64_msvc 0.53.0",
  "windows_i686_gnu 0.53.0",
@@ -9933,7 +9996,7 @@ version = "0.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "b66463ad2e0ea3bbf808b7f1d371311c80e115c0b71d60efc142cafbcfb057a6"
 dependencies = [
- "windows-link 0.1.1",
+ "windows-link 0.1.3",
 ]
 
 [[package]]
diff --git a/core/Cargo.toml b/core/Cargo.toml
index cede6ede5..8f4a4cafa 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -199,9 +199,10 @@ services-redis = ["dep:redis", "dep:bb8", 
"redis?/tokio-rustls-comp"]
 services-redis-native-tls = ["services-redis", "redis?/tokio-native-tls-comp"]
 services-rocksdb = ["dep:rocksdb", "internal-tokio-rt"]
 services-s3 = [
-  "dep:reqsign",
-  "reqsign?/services-aws",
-  "reqsign?/reqwest_request",
+  "dep:reqsign-core",
+  "dep:reqsign-aws-v4",
+  "dep:reqsign-file-read-tokio",
+  "dep:reqsign-http-send-reqwest",
   "dep:crc32c",
 ]
 services-seafile = []
@@ -279,6 +280,11 @@ sqlx = { version = "0.8.0", features = [
 
 # For http based services.
 reqsign = { version = "0.16.5", default-features = false, optional = true }
+# For S3 service migration to v1
+reqsign-core = { version = "2.0", default-features = false, optional = true }
+reqsign-aws-v4 = { version = "2.0", default-features = false, optional = true }
+reqsign-file-read-tokio = { version = "2.0", default-features = false, 
optional = true }
+reqsign-http-send-reqwest = { version = "2.0", default-features = false, 
optional = true }
 
 # for self-referencing structs
 ouroboros = { version = "0.18.4", optional = true }
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 4570ce6b0..d0cb7a695 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -22,7 +22,6 @@ use std::fmt::Write;
 use std::str::FromStr;
 use std::sync::Arc;
 use std::sync::LazyLock;
-use std::sync::atomic::AtomicBool;
 
 use base64::Engine;
 use base64::prelude::BASE64_STANDARD;
@@ -36,11 +35,16 @@ use log::warn;
 use md5::Digest;
 use md5::Md5;
 use percent_encoding::percent_decode_str;
-use reqsign::AwsAssumeRoleLoader;
-use reqsign::AwsConfig;
-use reqsign::AwsCredentialLoad;
-use reqsign::AwsDefaultLoader;
-use reqsign::AwsV4Signer;
+use reqsign_aws_v4::Credential;
+use reqsign_aws_v4::DefaultCredentialProvider;
+use reqsign_aws_v4::RequestSigner as AwsV4Signer;
+use reqsign_aws_v4::StaticCredentialProvider;
+use reqsign_core::Context;
+use reqsign_core::ProvideCredential;
+use reqsign_core::ProvideCredentialChain;
+use reqsign_core::Signer;
+use reqsign_file_read_tokio::TokioFileRead;
+use reqsign_http_send_reqwest::ReqwestHttpSend;
 use reqwest::Url;
 
 use super::S3_SCHEME;
@@ -102,9 +106,9 @@ impl Configurator for S3Config {
     fn into_builder(self) -> Self::Builder {
         S3Builder {
             config: self,
-            customized_credential_load: None,
 
             http_client: None,
+            credential_providers: None,
         }
     }
 }
@@ -117,10 +121,9 @@ impl Configurator for S3Config {
 pub struct S3Builder {
     config: S3Config,
 
-    customized_credential_load: Option<Box<dyn AwsCredentialLoad>>,
-
     #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` 
instead")]
     http_client: Option<HttpClient>,
+    credential_providers: Option<ProvideCredentialChain<Credential>>,
 }
 
 impl Debug for S3Builder {
@@ -492,15 +495,6 @@ impl S3Builder {
         self
     }
 
-    /// Adding a customized credential load for service.
-    ///
-    /// If customized_credential_load has been set, we will ignore all other
-    /// credential load methods.
-    pub fn customized_credential_load(mut self, cred: Box<dyn 
AwsCredentialLoad>) -> Self {
-        self.customized_credential_load = Some(cred);
-        self
-    }
-
     /// Specify the http client that used by this service.
     ///
     /// # Notes
@@ -521,31 +515,37 @@ impl S3Builder {
         self
     }
 
-    /// Check if `bucket` is valid
+    /// Replace the credential providers with a custom chain.
+    pub fn credential_provider_chain(mut self, chain: 
ProvideCredentialChain<Credential>) -> Self {
+        self.credential_providers = Some(chain);
+        self
+    }
+
+    /// Check if `bucket` is valid.
     /// `bucket` must be not empty and if `enable_virtual_host_style` is true
-    /// it couldn't contain dot(.) character
-    fn is_bucket_valid(&self) -> bool {
-        if self.config.bucket.is_empty() {
+    /// it could not contain dot (.) character.
+    fn is_bucket_valid(config: &S3Config) -> bool {
+        if config.bucket.is_empty() {
             return false;
         }
         // If enable virtual host style, `bucket` will reside in domain part,
         // for example `https://bucket_name.s3.us-east-1.amazonaws.com`,
         // so `bucket` with dot can't be recognized correctly for this format.
-        if self.config.enable_virtual_host_style && 
self.config.bucket.contains('.') {
+        if config.enable_virtual_host_style && config.bucket.contains('.') {
             return false;
         }
         true
     }
 
     /// Build endpoint with given region.
-    fn build_endpoint(&self, region: &str) -> String {
+    fn build_endpoint(config: &S3Config, region: &str) -> String {
         let bucket = {
-            debug_assert!(self.is_bucket_valid(), "bucket must be valid");
+            debug_assert!(Self::is_bucket_valid(config), "bucket must be 
valid");
 
-            self.config.bucket.as_str()
+            config.bucket.as_str()
         };
 
-        let mut endpoint = match &self.config.endpoint {
+        let mut endpoint = match &config.endpoint {
             Some(endpoint) => {
                 if endpoint.starts_with("http") {
                     endpoint.to_string()
@@ -576,7 +576,7 @@ impl S3Builder {
         };
 
         // Apply virtual host style.
-        if self.config.enable_virtual_host_style {
+        if config.enable_virtual_host_style {
             endpoint = endpoint.replace("//", &format!("//{bucket}."))
         } else {
             write!(endpoint, "/{bucket}").expect("write into string must 
succeed");
@@ -745,15 +745,22 @@ impl S3Builder {
 impl Builder for S3Builder {
     type Config = S3Config;
 
-    fn build(mut self) -> Result<impl Access> {
+    fn build(self) -> Result<impl Access> {
         debug!("backend build started: {:?}", &self);
 
-        let root = 
normalize_root(&self.config.root.clone().unwrap_or_default());
+        #[allow(deprecated)]
+        let S3Builder {
+            config,
+            http_client,
+            credential_providers,
+        } = self;
+
+        let root = normalize_root(&config.root.clone().unwrap_or_default());
         debug!("backend use root {}", &root);
 
         // Handle bucket name.
-        let bucket = if self.is_bucket_valid() {
-            Ok(&self.config.bucket)
+        let bucket = if Self::is_bucket_valid(&config) {
+            Ok(&config.bucket)
         } else {
             Err(
                 Error::new(ErrorKind::ConfigInvalid, "The bucket is 
misconfigured")
@@ -762,14 +769,14 @@ impl Builder for S3Builder {
         }?;
         debug!("backend use bucket {}", &bucket);
 
-        let default_storage_class = match &self.config.default_storage_class {
+        let default_storage_class = match &config.default_storage_class {
             None => None,
             Some(v) => Some(
                 build_header_value(v).map_err(|err| err.with_context("key", 
"storage_class"))?,
             ),
         };
 
-        let server_side_encryption = match &self.config.server_side_encryption 
{
+        let server_side_encryption = match &config.server_side_encryption {
             None => None,
             Some(v) => Some(
                 build_header_value(v)
@@ -778,7 +785,7 @@ impl Builder for S3Builder {
         };
 
         let server_side_encryption_aws_kms_key_id =
-            match &self.config.server_side_encryption_aws_kms_key_id {
+            match &config.server_side_encryption_aws_kms_key_id {
                 None => None,
                 Some(v) => Some(build_header_value(v).map_err(|err| {
                     err.with_context("key", 
"server_side_encryption_aws_kms_key_id")
@@ -786,7 +793,7 @@ impl Builder for S3Builder {
             };
 
         let server_side_encryption_customer_algorithm =
-            match &self.config.server_side_encryption_customer_algorithm {
+            match &config.server_side_encryption_customer_algorithm {
                 None => None,
                 Some(v) => Some(build_header_value(v).map_err(|err| {
                     err.with_context("key", 
"server_side_encryption_customer_algorithm")
@@ -794,7 +801,7 @@ impl Builder for S3Builder {
             };
 
         let server_side_encryption_customer_key =
-            match &self.config.server_side_encryption_customer_key {
+            match &config.server_side_encryption_customer_key {
                 None => None,
                 Some(v) => Some(build_header_value(v).map_err(|err| {
                     err.with_context("key", 
"server_side_encryption_customer_key")
@@ -802,14 +809,14 @@ impl Builder for S3Builder {
             };
 
         let server_side_encryption_customer_key_md5 =
-            match &self.config.server_side_encryption_customer_key_md5 {
+            match &config.server_side_encryption_customer_key_md5 {
                 None => None,
                 Some(v) => Some(build_header_value(v).map_err(|err| {
                     err.with_context("key", 
"server_side_encryption_customer_key_md5")
                 })?),
             };
 
-        let checksum_algorithm = match 
self.config.checksum_algorithm.as_deref() {
+        let checksum_algorithm = match config.checksum_algorithm.as_deref() {
             Some("crc32c") => Some(ChecksumAlgorithm::Crc32c),
             None => None,
             v => {
@@ -820,109 +827,65 @@ impl Builder for S3Builder {
             }
         };
 
-        // This is our current config.
-        let mut cfg = AwsConfig::default();
-        if !self.config.disable_config_load {
-            #[cfg(not(target_arch = "wasm32"))]
-            {
-                cfg = cfg.from_profile();
-                cfg = cfg.from_env();
-            }
-        }
-
-        if let Some(ref v) = self.config.region {
-            cfg.region = Some(v.to_string());
-        }
-
-        if cfg.region.is_none() {
-            return Err(Error::new(
-                ErrorKind::ConfigInvalid,
-                "region is missing. Please find it by S3::detect_region() or 
set them in env.",
-            )
-            .with_operation("Builder::build")
-            .with_context("service", Scheme::S3));
-        }
-
-        let region = cfg.region.to_owned().unwrap();
+        // Determine the region
+        let region = if let Some(ref v) = config.region {
+            v.to_string()
+        } else {
+            // Try to get region from environment
+            std::env::var("AWS_REGION")
+                .or_else(|_| std::env::var("AWS_DEFAULT_REGION"))
+                .map_err(|_| {
+                    Error::new(
+                        ErrorKind::ConfigInvalid,
+                        "region is missing. Please find it by 
S3::detect_region() or set them in env.",
+                    )
+                    .with_operation("Builder::build")
+                    .with_context("service", Scheme::S3)
+                })?
+        };
         debug!("backend use region: {region}");
 
-        // Retain the user's endpoint if it exists; otherwise, try loading it 
from the environment.
-        self.config.endpoint = self.config.endpoint.or_else(|| 
cfg.endpoint_url.clone());
-
         // Building endpoint.
-        let endpoint = self.build_endpoint(&region);
+        let endpoint = Self::build_endpoint(&config, &region);
         debug!("backend use endpoint: {endpoint}");
 
-        // Setting all value from user input if available.
-        if let Some(v) = self.config.access_key_id {
-            cfg.access_key_id = Some(v)
-        }
-        if let Some(v) = self.config.secret_access_key {
-            cfg.secret_access_key = Some(v)
-        }
-        if let Some(v) = self.config.session_token {
-            cfg.session_token = Some(v)
-        }
+        // Create the context for reqsign-core
+        let ctx = Context::new()
+            .with_file_read(TokioFileRead)
+            
.with_http_send(ReqwestHttpSend::new(GLOBAL_REQWEST_CLIENT.clone()));
 
-        let mut loader: Option<Box<dyn AwsCredentialLoad>> = None;
-        // If customized_credential_load is set, we will use it.
-        if let Some(v) = self.customized_credential_load {
-            loader = Some(v);
-        }
+        let mut provider = if let Some(chain) = credential_providers {
+            chain
+        } else {
+            let mut builder = DefaultCredentialProvider::builder();
 
-        // If role_arn is set, we must use AssumeRoleLoad.
-        if let Some(role_arn) = self.config.role_arn {
-            // use current env as source credential loader.
-            let default_loader =
-                AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), 
cfg.clone());
-
-            // Build the config for assume role.
-            let mut assume_role_cfg = AwsConfig {
-                region: Some(region.clone()),
-                role_arn: Some(role_arn),
-                external_id: self.config.external_id.clone(),
-                sts_regional_endpoints: "regional".to_string(),
-                ..Default::default()
-            };
+            if config.disable_config_load {
+                builder = builder.disable_env(true).disable_profile(true);
+            }
 
-            // override default role_session_name if set
-            if let Some(name) = self.config.role_session_name {
-                assume_role_cfg.role_session_name = name;
+            if config.disable_ec2_metadata {
+                builder = builder.disable_imds(true);
             }
 
-            let assume_role_loader = AwsAssumeRoleLoader::new(
-                GLOBAL_REQWEST_CLIENT.clone().clone(),
-                assume_role_cfg,
-                Box::new(default_loader),
-            )
-            .map_err(|err| {
-                Error::new(
-                    ErrorKind::ConfigInvalid,
-                    "The assume_role_loader is misconfigured",
-                )
-                .with_context("service", Scheme::S3)
-                .set_source(err)
-            })?;
-            loader = Some(Box::new(assume_role_loader));
+            ProvideCredentialChain::new().push(builder.build())
+        };
+
+        if let (Some(ak), Some(sk)) = (&config.access_key_id, 
&config.secret_access_key) {
+            let static_provider = if let Some(token) = 
config.session_token.as_deref() {
+                StaticCredentialProvider::new(ak, sk).with_session_token(token)
+            } else {
+                StaticCredentialProvider::new(ak, sk)
+            };
+            provider = provider.push_front(static_provider);
         }
-        // If loader is not set, we will use default loader.
-        let loader = match loader {
-            Some(v) => v,
-            None => {
-                let mut default_loader =
-                    
AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg);
-                if self.config.disable_ec2_metadata {
-                    default_loader = 
default_loader.with_disable_ec2_metadata();
-                }
 
-                Box::new(default_loader)
-            }
-        };
+        // Create request signer for S3
+        let request_signer = AwsV4Signer::new("s3", &region);
 
-        let signer = AwsV4Signer::new("s3", &region);
+        // Create the signer
+        let signer = Signer::new(ctx, provider, request_signer);
 
-        let delete_max_size = self
-            .config
+        let delete_max_size = config
             .delete_max_size
             .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
 
@@ -939,16 +902,11 @@ impl Builder for S3Builder {
                             stat_with_if_none_match: true,
                             stat_with_if_modified_since: true,
                             stat_with_if_unmodified_since: true,
-                            stat_with_override_cache_control: !self
-                                .config
-                                .disable_stat_with_override,
-                            stat_with_override_content_disposition: !self
-                                .config
-                                .disable_stat_with_override,
-                            stat_with_override_content_type: !self
-                                .config
+                            stat_with_override_cache_control: 
!config.disable_stat_with_override,
+                            stat_with_override_content_disposition: !config
                                 .disable_stat_with_override,
-                            stat_with_version: self.config.enable_versioning,
+                            stat_with_override_content_type: 
!config.disable_stat_with_override,
+                            stat_with_version: config.enable_versioning,
 
                             read: true,
                             read_with_if_match: true,
@@ -958,17 +916,17 @@ impl Builder for S3Builder {
                             read_with_override_cache_control: true,
                             read_with_override_content_disposition: true,
                             read_with_override_content_type: true,
-                            read_with_version: self.config.enable_versioning,
+                            read_with_version: config.enable_versioning,
 
                             write: true,
                             write_can_empty: true,
                             write_can_multi: true,
-                            write_can_append: 
self.config.enable_write_with_append,
+                            write_can_append: config.enable_write_with_append,
 
                             write_with_cache_control: true,
                             write_with_content_type: true,
                             write_with_content_encoding: true,
-                            write_with_if_match: 
!self.config.disable_write_with_if_match,
+                            write_with_if_match: 
!config.disable_write_with_if_match,
                             write_with_if_not_exists: true,
                             write_with_user_metadata: true,
 
@@ -987,7 +945,7 @@ impl Builder for S3Builder {
 
                             delete: true,
                             delete_max_size: Some(delete_max_size),
-                            delete_with_version: self.config.enable_versioning,
+                            delete_with_version: config.enable_versioning,
 
                             copy: true,
 
@@ -995,8 +953,8 @@ impl Builder for S3Builder {
                             list_with_limit: true,
                             list_with_start_after: true,
                             list_with_recursive: true,
-                            list_with_versions: self.config.enable_versioning,
-                            list_with_deleted: self.config.enable_versioning,
+                            list_with_versions: config.enable_versioning,
+                            list_with_deleted: config.enable_versioning,
 
                             presign: true,
                             presign_stat: true,
@@ -1010,7 +968,7 @@ impl Builder for S3Builder {
 
                     // allow deprecated api here for compatibility
                     #[allow(deprecated)]
-                    if let Some(client) = self.http_client {
+                    if let Some(client) = http_client {
                         am.update_http_client(|_| client);
                     }
 
@@ -1025,12 +983,10 @@ impl Builder for S3Builder {
                 server_side_encryption_customer_key,
                 server_side_encryption_customer_key_md5,
                 default_storage_class,
-                allow_anonymous: self.config.allow_anonymous,
-                disable_list_objects_v2: self.config.disable_list_objects_v2,
-                enable_request_payer: self.config.enable_request_payer,
+                allow_anonymous: config.allow_anonymous,
+                disable_list_objects_v2: config.disable_list_objects_v2,
+                enable_request_payer: config.enable_request_payer,
                 signer,
-                loader,
-                credential_loaded: AtomicBool::new(false),
                 checksum_algorithm,
             }),
         })
@@ -1170,9 +1126,9 @@ impl Access for S3Backend {
                 "operation is not supported",
             )),
         };
-        let mut req = req?;
+        let req = req?;
 
-        self.core.sign_query(&mut req, expire).await?;
+        let req = self.core.sign_query(req, expire).await?;
 
         // We don't need this request anymore, consume it directly.
         let (parts, _) = req.into_parts();
@@ -1206,7 +1162,7 @@ mod tests {
             if enable_virtual_host_style {
                 b = b.enable_virtual_host_style();
             }
-            assert_eq!(b.is_bucket_valid(), expected)
+            assert_eq!(S3Builder::is_bucket_valid(&b.config), expected)
         }
     }
 
@@ -1227,7 +1183,7 @@ mod tests {
                 b = b.endpoint(endpoint);
             }
 
-            let endpoint = b.build_endpoint("us-east-2");
+            let endpoint = S3Builder::build_endpoint(&b.config, "us-east-2");
             assert_eq!(endpoint, "https://s3.us-east-2.amazonaws.com/test";);
         }
 
@@ -1239,7 +1195,7 @@ mod tests {
                 b = b.endpoint(endpoint);
             }
 
-            let endpoint = b.build_endpoint("us-east-2");
+            let endpoint = S3Builder::build_endpoint(&b.config, "us-east-2");
             assert_eq!(endpoint, "https://test.s3.us-east-2.amazonaws.com";);
         }
     }
diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs
index 22e7d1f57..d063ce921 100644
--- a/core/src/services/s3/core.rs
+++ b/core/src/services/s3/core.rs
@@ -21,8 +21,6 @@ use std::fmt::Display;
 use std::fmt::Formatter;
 use std::fmt::Write;
 use std::sync::Arc;
-use std::sync::atomic;
-use std::sync::atomic::AtomicBool;
 use std::time::Duration;
 
 use base64::Engine;
@@ -43,9 +41,8 @@ use http::header::IF_MATCH;
 use http::header::IF_MODIFIED_SINCE;
 use http::header::IF_NONE_MATCH;
 use http::header::IF_UNMODIFIED_SINCE;
-use reqsign::AwsCredential;
-use reqsign::AwsCredentialLoad;
-use reqsign::AwsV4Signer;
+use reqsign_aws_v4::Credential;
+use reqsign_core::Signer;
 use serde::Deserialize;
 use serde::Serialize;
 
@@ -104,9 +101,7 @@ pub struct S3Core {
     pub disable_list_objects_v2: bool,
     pub enable_request_payer: bool,
 
-    pub signer: AwsV4Signer,
-    pub loader: Box<dyn AwsCredentialLoad>,
-    pub credential_loaded: AtomicBool,
+    pub signer: Signer<Credential>,
     pub checksum_algorithm: Option<ChecksumAlgorithm>,
 }
 
@@ -121,52 +116,43 @@ impl Debug for S3Core {
 }
 
 impl S3Core {
-    /// If credential is not found, we will not sign the request.
-    async fn load_credential(&self) -> Result<Option<AwsCredential>> {
-        let cred = self
-            .loader
-            .load_credential(GLOBAL_REQWEST_CLIENT.clone())
+    pub async fn sign_query<T>(&self, req: Request<T>, duration: Duration) -> 
Result<Request<T>> {
+        // Skip signing for anonymous access
+        if self.allow_anonymous {
+            return Ok(req);
+        }
+
+        // Sign the request with presigned URL
+        let (mut parts, body) = req.into_parts();
+
+        self.signer
+            .sign(&mut parts, Some(duration))
             .await
-            .map_err(new_request_credential_error)?;
+            .map_err(|e| new_request_sign_error(e.into()))?;
 
-        if let Some(cred) = cred {
-            // Update credential_loaded to true if we have load credential 
successfully.
-            self.credential_loaded
-                .store(true, atomic::Ordering::Relaxed);
-            return Ok(Some(cred));
-        }
+        // Always remove host header, let users' client to set it based on HTTP
+        // version.
+        //
+        // As discussed in 
<https://github.com/seanmonstar/reqwest/issues/1809>,
+        // google server could send RST_STREAM of PROTOCOL_ERROR if our request
+        // contains host header.
+        parts.headers.remove(HOST);
 
-        // If we have load credential before but failed to load this time, we 
should
-        // return error instead.
-        if self.credential_loaded.load(atomic::Ordering::Relaxed) {
-            return Err(Error::new(
-                ErrorKind::PermissionDenied,
-                "credential was previously loaded successfully but has failed 
this time",
-            )
-            .set_temporary());
-        }
+        Ok(Request::from_parts(parts, body))
+    }
 
-        // Credential is empty and users allow anonymous access, we will not 
sign the request.
+    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> 
{
+        // Skip signing for anonymous access
         if self.allow_anonymous {
-            return Ok(None);
+            return self.info.http_client().send(req).await;
         }
 
-        Err(Error::new(
-            ErrorKind::PermissionDenied,
-            "no valid credential found and anonymous access is not allowed",
-        ))
-    }
-
-    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
-        let cred = if let Some(cred) = self.load_credential().await? {
-            cred
-        } else {
-            return Ok(());
-        };
+        let (mut parts, body) = req.into_parts();
 
         self.signer
-            .sign(req, &cred)
-            .map_err(new_request_sign_error)?;
+            .sign(&mut parts, None)
+            .await
+            .map_err(|e| new_request_sign_error(e.into()))?;
 
         // Always remove host header, let users' client to set it based on HTTP
         // version.
@@ -174,21 +160,26 @@ impl S3Core {
         // As discussed in 
<https://github.com/seanmonstar/reqwest/issues/1809>,
         // google server could send RST_STREAM of PROTOCOL_ERROR if our request
         // contains host header.
-        req.headers_mut().remove(HOST);
+        parts.headers.remove(HOST);
 
-        Ok(())
+        self.info
+            .http_client()
+            .send(Request::from_parts(parts, body))
+            .await
     }
 
-    pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: 
Duration) -> Result<()> {
-        let cred = if let Some(cred) = self.load_credential().await? {
-            cred
-        } else {
-            return Ok(());
-        };
+    pub async fn fetch(&self, req: Request<Buffer>) -> 
Result<Response<HttpBody>> {
+        // Skip signing for anonymous access
+        if self.allow_anonymous {
+            return self.info.http_client().fetch(req).await;
+        }
+
+        let (mut parts, body) = req.into_parts();
 
         self.signer
-            .sign_query(req, duration, &cred)
-            .map_err(new_request_sign_error)?;
+            .sign(&mut parts, None)
+            .await
+            .map_err(|e| new_request_sign_error(e.into()))?;
 
         // Always remove host header, let users' client to set it based on HTTP
         // version.
@@ -196,14 +187,12 @@ impl S3Core {
         // As discussed in 
<https://github.com/seanmonstar/reqwest/issues/1809>,
         // google server could send RST_STREAM of PROTOCOL_ERROR if our request
         // contains host header.
-        req.headers_mut().remove(HOST);
+        parts.headers.remove(HOST);
 
-        Ok(())
-    }
-
-    #[inline]
-    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> 
{
-        self.info.http_client().send(req).await
+        self.info
+            .http_client()
+            .fetch(Request::from_parts(parts, body))
+            .await
     }
 
     /// # Note
@@ -529,11 +518,8 @@ impl S3Core {
         range: BytesRange,
         args: &OpRead,
     ) -> Result<Response<HttpBody>> {
-        let mut req = self.s3_get_object_request(path, range, args)?;
-
-        self.sign(&mut req).await?;
-
-        self.info.http_client().fetch(req).await
+        let req = self.s3_get_object_request(path, range, args)?;
+        self.fetch(req).await
     }
 
     pub fn s3_put_object_request(
@@ -610,10 +596,7 @@ impl S3Core {
     }
 
     pub async fn s3_head_object(&self, path: &str, args: OpStat) -> 
Result<Response<Buffer>> {
-        let mut req = self.s3_head_object_request(path, args)?;
-
-        self.sign(&mut req).await?;
-
+        let req = self.s3_head_object_request(path, args)?;
         self.send(req).await
     }
 
@@ -641,14 +624,12 @@ impl S3Core {
         // Set request payer header if enabled.
         req = self.insert_request_payer_header(req);
 
-        let mut req = req
+        let req = req
             // Inject operation to the request.
             .extension(Operation::Delete)
             .body(Buffer::new())
             .map_err(new_request_build_error)?;
 
-        self.sign(&mut req).await?;
-
         self.send(req).await
     }
 
@@ -703,15 +684,13 @@ impl S3Core {
         // Set request payer header if enabled.
         req = self.insert_request_payer_header(req);
 
-        let mut req = req
+        let req = req
             // Inject operation to the request.
             .extension(Operation::Copy)
             .header(constants::X_AMZ_COPY_SOURCE, &source)
             .body(Buffer::new())
             .map_err(new_request_build_error)?;
 
-        self.sign(&mut req).await?;
-
         self.send(req).await
     }
 
@@ -744,14 +723,12 @@ impl S3Core {
         // Set request payer header if enabled.
         req = self.insert_request_payer_header(req);
 
-        let mut req = req
+        let req = req
             // Inject operation to the request.
             .extension(Operation::List)
             .body(Buffer::new())
             .map_err(new_request_build_error)?;
 
-        self.sign(&mut req).await?;
-
         self.send(req).await
     }
 
@@ -796,14 +773,12 @@ impl S3Core {
         // Set request payer header if enabled.
         req = self.insert_request_payer_header(req);
 
-        let mut req = req
+        let req = req
             // Inject operation to the request.
             .extension(Operation::List)
             .body(Buffer::new())
             .map_err(new_request_build_error)?;
 
-        self.sign(&mut req).await?;
-
         self.send(req).await
     }
 
@@ -854,9 +829,7 @@ impl S3Core {
         // Inject operation to the request.
         req = req.extension(Operation::Write);
 
-        let mut req = 
req.body(Buffer::new()).map_err(new_request_build_error)?;
-
-        self.sign(&mut req).await?;
+        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
 
         self.send(req).await
     }
@@ -937,12 +910,10 @@ impl S3Core {
         // Inject operation to the request.
         req = req.extension(Operation::Write);
 
-        let mut req = req
+        let req = req
             .body(Buffer::from(Bytes::from(content)))
             .map_err(new_request_build_error)?;
 
-        self.sign(&mut req).await?;
-
         self.send(req).await
     }
 
@@ -966,13 +937,12 @@ impl S3Core {
         // Set request payer header if enabled.
         req = self.insert_request_payer_header(req);
 
-        let mut req = req
+        let req = req
             // Inject operation to the request.
             .extension(Operation::Write)
             .body(Buffer::new())
             .map_err(new_request_build_error)?;
 
-        self.sign(&mut req).await?;
         self.send(req).await
     }
 
@@ -1008,12 +978,10 @@ impl S3Core {
         // Inject operation to the request.
         req = req.extension(Operation::Delete);
 
-        let mut req = req
+        let req = req
             .body(Buffer::from(Bytes::from(content)))
             .map_err(new_request_build_error)?;
 
-        self.sign(&mut req).await?;
-
         self.send(req).await
     }
 
@@ -1057,14 +1025,12 @@ impl S3Core {
         // Set request payer header if enabled.
         req = self.insert_request_payer_header(req);
 
-        let mut req = req
+        let req = req
             // Inject operation to the request.
             .extension(Operation::List)
             .body(Buffer::new())
             .map_err(new_request_build_error)?;
 
-        self.sign(&mut req).await?;
-
         self.send(req).await
     }
 }
diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs
index 54a1b6e3f..82932cae8 100644
--- a/core/src/services/s3/writer.rs
+++ b/core/src/services/s3/writer.rs
@@ -66,12 +66,10 @@ impl S3Writer {
 
 impl oio::MultipartWrite for S3Writer {
     async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
-        let mut req = self
+        let req = self
             .core
             .s3_put_object_request(&self.path, Some(size), &self.op, body)?;
 
-        self.core.sign(&mut req).await?;
-
         let resp = self.core.send(req).await?;
 
         let status = resp.status();
@@ -117,7 +115,7 @@ impl oio::MultipartWrite for S3Writer {
 
         let checksum = self.core.calculate_checksum(&body);
 
-        let mut req = self.core.s3_upload_part_request(
+        let req = self.core.s3_upload_part_request(
             &self.path,
             upload_id,
             part_number,
@@ -126,8 +124,6 @@ impl oio::MultipartWrite for S3Writer {
             checksum.clone(),
         )?;
 
-        self.core.sign(&mut req).await?;
-
         let resp = self.core.send(req).await?;
 
         let status = resp.status();
@@ -242,12 +238,10 @@ impl oio::AppendWrite for S3Writer {
     }
 
     async fn append(&self, offset: u64, size: u64, body: Buffer) -> 
Result<Metadata> {
-        let mut req = self
+        let req = self
             .core
             .s3_append_object_request(&self.path, offset, size, &self.op, 
body)?;
 
-        self.core.sign(&mut req).await?;
-
         let resp = self.core.send(req).await?;
 
         let status = resp.status();


Reply via email to