This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 7f7583d89 refactor!: Migrate service s3 to reqsign-core 2.0 (#6656)
7f7583d89 is described below
commit 7f7583d89ccd913e2a469deccc3eb78a898fdefc
Author: Xuanwo <[email protected]>
AuthorDate: Thu Nov 20 18:13:40 2025 +0800
refactor!: Migrate service s3 to reqsign-core 2.0 (#6656)
* Save work
Signed-off-by: Xuanwo <[email protected]>
* Address assume role support
Signed-off-by: Xuanwo <[email protected]>
* Fix
Signed-off-by: Xuanwo <[email protected]>
* debug
Signed-off-by: Xuanwo <[email protected]>
* Fix
Signed-off-by: Xuanwo <[email protected]>
* Add os env support
Signed-off-by: Xuanwo <[email protected]>
* Test
Signed-off-by: Xuanwo <[email protected]>
* Enable reginal sts endpoint
Signed-off-by: Xuanwo <[email protected]>
* Add region
Signed-off-by: Xuanwo <[email protected]>
* Fix build
Signed-off-by: Xuanwo <[email protected]>
* Enable jiff
Signed-off-by: Xuanwo <[email protected]>
* Update lock
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
.github/workflows/test_edge.yml | 1 +
core/Cargo.lock | 145 ++++++--
core/Cargo.toml | 15 +-
.../Cargo.toml | 1 +
.../src/main.rs | 6 +
core/src/services/s3/backend.rs | 394 +++++++++++++--------
core/src/services/s3/config.rs | 3 +-
core/src/services/s3/core.rs | 160 ++++-----
core/src/services/s3/writer.rs | 12 +-
9 files changed, 439 insertions(+), 298 deletions(-)
diff --git a/.github/workflows/test_edge.yml b/.github/workflows/test_edge.yml
index 1d3ceb4c9..1e5fc3aeb 100644
--- a/.github/workflows/test_edge.yml
+++ b/.github/workflows/test_edge.yml
@@ -138,3 +138,4 @@ jobs:
OPENDAL_S3_BUCKET: opendal-testing
OPENDAL_S3_ROLE_ARN: arn:aws:iam::952853449216:role/opendal-testing
OPENDAL_S3_REGION: ap-northeast-1
+ RUST_LOG: debug
diff --git a/core/Cargo.lock b/core/Cargo.lock
index c52dffc02..76d6c1277 100644
--- a/core/Cargo.lock
+++ b/core/Cargo.lock
@@ -588,9 +588,9 @@ checksum =
"d5b3469636cdf8543cceab175efca534471f36eee12fb8374aba00eb5e7e7f8a"
[[package]]
name = "aws-config"
-version = "1.8.10"
+version = "1.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1856b1b48b65f71a4dd940b1c0931f9a7b646d4a924b9828ffefc1454714668a"
+checksum = "a0149602eeaf915158e14029ba0c78dedb8c08d554b024d54c8f239aab46511d"
dependencies = [
"aws-credential-types",
"aws-runtime",
@@ -618,9 +618,9 @@ dependencies = [
[[package]]
name = "aws-credential-types"
-version = "1.2.9"
+version = "1.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "86590e57ea40121d47d3f2e131bfd873dea15d78dc2f4604f4734537ad9e56c4"
+checksum = "b01c9521fa01558f750d183c8c68c81b0155b9d193a4ba7f84c36bd1b6d04a06"
dependencies = [
"aws-smithy-async",
"aws-smithy-runtime-api",
@@ -653,9 +653,9 @@ dependencies = [
[[package]]
name = "aws-runtime"
-version = "1.5.14"
+version = "1.5.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8fe0fd441565b0b318c76e7206c8d1d0b0166b3e986cf30e890b61feb6192045"
+checksum = "7ce527fb7e53ba9626fc47824f25e256250556c40d8f81d27dd92aa38239d632"
dependencies = [
"aws-credential-types",
"aws-sigv4",
@@ -712,9 +712,9 @@ dependencies = [
[[package]]
name = "aws-sdk-sso"
-version = "1.89.0"
+version = "1.90.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a9c1b1af02288f729e95b72bd17988c009aa72e26dcb59b3200f86d7aea726c9"
+checksum = "4f18e53542c522459e757f81e274783a78f8c81acdfc8d1522ee8a18b5fb1c66"
dependencies = [
"aws-credential-types",
"aws-runtime",
@@ -734,9 +734,9 @@ dependencies = [
[[package]]
name = "aws-sdk-ssooidc"
-version = "1.91.0"
+version = "1.92.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4e8122301558dc7c6c68e878af918880b82ff41897a60c8c4e18e4dc4d93e9f1"
+checksum = "532f4d866012ffa724a4385c82e8dd0e59f0ca0e600f3f22d4c03b6824b34e4a"
dependencies = [
"aws-credential-types",
"aws-runtime",
@@ -756,9 +756,9 @@ dependencies = [
[[package]]
name = "aws-sdk-sts"
-version = "1.92.0"
+version = "1.94.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a0c7808adcff8333eaa76a849e6de926c6ac1a1268b9fd6afe32de9c29ef29d2"
+checksum = "1be6fbbfa1a57724788853a623378223fe828fc4c09b146c992f0c95b6256174"
dependencies = [
"aws-credential-types",
"aws-runtime",
@@ -1712,9 +1712,9 @@ dependencies = [
[[package]]
name = "clap"
-version = "4.5.51"
+version = "4.5.53"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4c26d721170e0295f191a69bd9a1f93efcdb0aff38684b61ab5750468972e5f5"
+checksum = "c9e340e012a1bf4935f5282ed1436d1489548e8f72308207ea5df0e23d2d03f8"
dependencies = [
"clap_builder",
"clap_derive",
@@ -1722,9 +1722,9 @@ dependencies = [
[[package]]
name = "clap_builder"
-version = "4.5.51"
+version = "4.5.53"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "75835f0c7bf681bfd05abe44e965760fea999a5286c6eb2d59883634fd02011a"
+checksum = "d76b5d13eaa18c901fd2f7fca939fefe3a0727a953561fefdf3b2922b8569d00"
dependencies = [
"anstream",
"anstyle",
@@ -1913,9 +1913,9 @@ dependencies = [
[[package]]
name = "compio-runtime"
-version = "0.9.4"
+version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1279eb0b0346610a453b80ca410b914b8ee7baceb9f045a9aa42c440b16b43c0"
+checksum = "b467b43c646a60bae3bb3a55c2c200ea7c4416a63cc5a6070450aa6771b3c62e"
dependencies = [
"async-task",
"cfg-if",
@@ -2664,6 +2664,7 @@ version = "0.54.0"
dependencies = [
"opendal",
"tokio",
+ "tracing-subscriber",
"uuid",
]
@@ -4285,10 +4286,12 @@ checksum =
"49cce2b81f2098e7e3efc35bc2e0a6b7abec9d34128283d7a26fa8f32a6dbb35"
dependencies = [
"jiff-static",
"jiff-tzdb-platform",
+ "js-sys",
"log",
"portable-atomic",
"portable-atomic-util",
"serde_core",
+ "wasm-bindgen",
"windows-sys 0.61.2",
]
@@ -4917,9 +4920,9 @@ dependencies = [
[[package]]
name = "mongocrypt"
-version = "0.3.1"
+version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "22426d6318d19c5c0773f783f85375265d6a8f0fa76a733da8dc4355516ec63d"
+checksum = "8da0cd419a51a5fb44819e290fbdb0665a54f21dead8923446a799c7f4d26ad9"
dependencies = [
"bson",
"mongocrypt-sys",
@@ -4929,9 +4932,9 @@ dependencies = [
[[package]]
name = "mongocrypt-sys"
-version = "0.1.4+1.12.0"
+version = "0.1.5+1.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dda42df21d035f88030aad8e877492fac814680e1d7336a57b2a091b989ae388"
+checksum = "224484c5d09285a7b8cb0a0c117e847ebd14cb6e4470ecf68cdb89c503b0edb9"
[[package]]
name = "mongodb"
@@ -4985,9 +4988,9 @@ dependencies = [
[[package]]
name = "mongodb-internal-macros"
-version = "3.3.0"
+version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "63981427a0f26b89632fd2574280e069d09fb2912a3138da15de0174d11dd077"
+checksum = "aad8ee1af511fd4031cd3ddf37667c625182075c75a66ae3c81db8b8fdf91e9e"
dependencies = [
"macro_magic",
"proc-macro2",
@@ -5389,11 +5392,15 @@ dependencies = [
"prometheus 0.14.0",
"prometheus-client",
"prost 0.13.5",
- "quick-xml 0.38.4",
+ "quick-xml",
"rand 0.8.5",
"redb",
"redis",
"reqsign",
+ "reqsign-aws-v4",
+ "reqsign-core",
+ "reqsign-file-read-tokio",
+ "reqsign-http-send-reqwest",
"reqwest",
"rocksdb",
"rustls-native-certs 0.8.2",
@@ -6376,7 +6383,7 @@ version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1"
dependencies = [
- "heck 0.4.1",
+ "heck 0.5.0",
"itertools 0.14.0",
"log",
"multimap",
@@ -6542,16 +6549,6 @@ dependencies = [
"winapi",
]
-[[package]]
-name = "quick-xml"
-version = "0.37.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "331e97a1af0bf59823e6eadffe373d7b27f485be8748f71471c662c1f269b7fb"
-dependencies = [
- "memchr",
- "serde",
-]
-
[[package]]
name = "quick-xml"
version = "0.38.4"
@@ -6970,18 +6967,88 @@ dependencies = [
"log",
"once_cell",
"percent-encoding",
- "quick-xml 0.37.5",
"rand 0.8.5",
"reqwest",
"rsa",
+ "serde",
+ "serde_json",
+ "sha1",
+ "sha2",
+]
+
+[[package]]
+name = "reqsign-aws-v4"
+version = "2.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4510c2a3e42b653cf788d560a3d54b0ae4cc315a62aaba773554f18319c0db0b"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "bytes",
+ "form_urlencoded",
+ "http 1.3.1",
+ "log",
+ "percent-encoding",
+ "quick-xml",
+ "reqsign-core",
"rust-ini",
"serde",
"serde_json",
+ "serde_urlencoded",
+ "sha1",
+]
+
+[[package]]
+name = "reqsign-core"
+version = "2.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "39da118ccf3bdb067ac6cc40136fec99bc5ba418cbd388dc88e4ce0e5d0b1423"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "base64 0.22.1",
+ "bytes",
+ "form_urlencoded",
+ "hex",
+ "hmac",
+ "http 1.3.1",
+ "jiff",
+ "log",
+ "percent-encoding",
"sha1",
"sha2",
+ "windows-sys 0.61.2",
+]
+
+[[package]]
+name = "reqsign-file-read-tokio"
+version = "2.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "669ea66036266a9ac371d2e63cc7d345e69994da0168b4e6f3487fe21e126f76"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "reqsign-core",
"tokio",
]
+[[package]]
+name = "reqsign-http-send-reqwest"
+version = "2.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "46186bce769674f9200ad01af6f2ca42de3e819ddc002fff1edae135bfb6cd9c"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "bytes",
+ "futures-channel",
+ "http 1.3.1",
+ "http-body-util",
+ "reqsign-core",
+ "reqwest",
+ "wasm-bindgen-futures",
+]
+
[[package]]
name = "reqwest"
version = "0.12.24"
@@ -7027,9 +7094,9 @@ dependencies = [
[[package]]
name = "resolv-conf"
-version = "0.7.5"
+version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6b3789b30bd25ba102de4beabd95d21ac45b69b1be7d14522bab988c526d6799"
+checksum = "1e061d1b48cb8d38042de4ae0a7a6401009d6143dc80d2e2d6f31f0bdd6470c7"
[[package]]
name = "revision"
diff --git a/core/Cargo.toml b/core/Cargo.toml
index f332c68d2..5ad6f706d 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -199,9 +199,10 @@ services-redis = ["dep:redis", "dep:bb8",
"redis?/tokio-rustls-comp"]
services-redis-native-tls = ["services-redis", "redis?/tokio-native-tls-comp"]
services-rocksdb = ["dep:rocksdb", "internal-tokio-rt"]
services-s3 = [
- "dep:reqsign",
- "reqsign?/services-aws",
- "reqsign?/reqwest_request",
+ "dep:reqsign-core",
+ "dep:reqsign-aws-v4",
+ "dep:reqsign-file-read-tokio",
+ "dep:reqsign-http-send-reqwest",
"dep:crc32c",
]
services-seafile = []
@@ -252,7 +253,6 @@ jiff = { version = "0.2.15", features = ["serde"] }
log = "0.4"
md-5 = "0.10"
percent-encoding = "2"
-url = "2.5"
quick-xml = { version = "0.38", features = ["serialize", "overlapped-lists"] }
reqwest = { version = "0.12.24", features = [
"stream",
@@ -260,6 +260,7 @@ reqwest = { version = "0.12.24", features = [
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1.48", features = ["sync", "io-util"] }
+url = "2.5"
uuid = { version = "1", features = ["serde", "v4"] }
# Test only dependencies
@@ -280,6 +281,11 @@ sqlx = { version = "0.8.0", features = [
# For http based services.
reqsign = { version = "0.16.5", default-features = false, optional = true }
+# For S3 service migration to v1
+reqsign-aws-v4 = { version = "2.0.1", default-features = false, optional =
true }
+reqsign-core = { version = "2.0.1", default-features = false, optional = true }
+reqsign-file-read-tokio = { version = "2.0.1", default-features = false,
optional = true }
+reqsign-http-send-reqwest = { version = "2.0.1", default-features = false,
optional = true }
# for self-referencing structs
ouroboros = { version = "0.18.4", optional = true }
@@ -408,6 +414,7 @@ probe = { version = "0.5.1", optional = true }
[target.'cfg(target_arch = "wasm32")'.dependencies]
backon = { version = "1.6", features = ["gloo-timers-sleep"] }
getrandom = { version = "0.2", features = ["js"] }
+jiff = { version = "0.2.15", features = ["serde", "js"] }
tokio = { version = "1.48", features = ["time"] }
uuid = { version = "1.18", features = ["serde", "v4", "js"] }
diff --git a/core/edge/s3_aws_assume_role_with_web_identity/Cargo.toml
b/core/edge/s3_aws_assume_role_with_web_identity/Cargo.toml
index 0e898209e..fe3fd7696 100644
--- a/core/edge/s3_aws_assume_role_with_web_identity/Cargo.toml
+++ b/core/edge/s3_aws_assume_role_with_web_identity/Cargo.toml
@@ -27,4 +27,5 @@ version.workspace = true
[dependencies]
opendal = { path = "../..", features = ["tests"] }
tokio = { version = "1", features = ["full"] }
+tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
uuid = { version = "1", features = ["serde", "v4"] }
diff --git a/core/edge/s3_aws_assume_role_with_web_identity/src/main.rs
b/core/edge/s3_aws_assume_role_with_web_identity/src/main.rs
index 333f3c200..1b3774c29 100644
--- a/core/edge/s3_aws_assume_role_with_web_identity/src/main.rs
+++ b/core/edge/s3_aws_assume_role_with_web_identity/src/main.rs
@@ -20,6 +20,12 @@ use opendal::raw::tests::init_test_service;
#[tokio::main]
async fn main() -> Result<()> {
+ let _ = tracing_subscriber::fmt()
+ .pretty()
+ .with_test_writer()
+ .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
+ .try_init();
+
let op = init_test_service()?.expect("service must be init");
assert_eq!(op.info().scheme(), opendal::services::S3_SCHEME);
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 912445d6e..c5d42ba5f 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -21,7 +21,6 @@ use std::fmt::Write;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::LazyLock;
-use std::sync::atomic::AtomicBool;
use base64::Engine;
use base64::prelude::BASE64_STANDARD;
@@ -33,11 +32,17 @@ use log::debug;
use log::warn;
use md5::Digest;
use md5::Md5;
-use reqsign::AwsAssumeRoleLoader;
-use reqsign::AwsConfig;
-use reqsign::AwsCredentialLoad;
-use reqsign::AwsDefaultLoader;
-use reqsign::AwsV4Signer;
+use reqsign_aws_v4::AssumeRoleCredentialProvider;
+use reqsign_aws_v4::Credential;
+use reqsign_aws_v4::DefaultCredentialProvider;
+use reqsign_aws_v4::RequestSigner as AwsV4Signer;
+use reqsign_aws_v4::StaticCredentialProvider;
+use reqsign_core::Context;
+use reqsign_core::OsEnv;
+use reqsign_core::ProvideCredentialChain;
+use reqsign_core::Signer;
+use reqsign_file_read_tokio::TokioFileRead;
+use reqsign_http_send_reqwest::ReqwestHttpSend;
use reqwest::Url;
use super::S3_SCHEME;
@@ -75,10 +80,9 @@ const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
pub struct S3Builder {
pub(super) config: S3Config,
- pub(super) customized_credential_load: Option<Box<dyn AwsCredentialLoad>>,
-
#[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client`
instead")]
pub(super) http_client: Option<HttpClient>,
+ pub(super) credential_providers:
Option<ProvideCredentialChain<Credential>>,
}
impl Debug for S3Builder {
@@ -443,15 +447,6 @@ impl S3Builder {
self
}
- /// Adding a customized credential load for service.
- ///
- /// If customized_credential_load has been set, we will ignore all other
- /// credential load methods.
- pub fn customized_credential_load(mut self, cred: Box<dyn
AwsCredentialLoad>) -> Self {
- self.customized_credential_load = Some(cred);
- self
- }
-
/// Specify the http client that used by this service.
///
/// # Notes
@@ -472,31 +467,37 @@ impl S3Builder {
self
}
- /// Check if `bucket` is valid
+ /// Replace the credential providers with a custom chain.
+ pub fn credential_provider_chain(mut self, chain:
ProvideCredentialChain<Credential>) -> Self {
+ self.credential_providers = Some(chain);
+ self
+ }
+
+ /// Check if `bucket` is valid.
/// `bucket` must be not empty and if `enable_virtual_host_style` is true
- /// it couldn't contain dot(.) character
- fn is_bucket_valid(&self) -> bool {
- if self.config.bucket.is_empty() {
+ /// it could not contain dot (.) character.
+ fn is_bucket_valid(config: &S3Config) -> bool {
+ if config.bucket.is_empty() {
return false;
}
// If enable virtual host style, `bucket` will reside in domain part,
// for example `https://bucket_name.s3.us-east-1.amazonaws.com`,
// so `bucket` with dot can't be recognized correctly for this format.
- if self.config.enable_virtual_host_style &&
self.config.bucket.contains('.') {
+ if config.enable_virtual_host_style && config.bucket.contains('.') {
return false;
}
true
}
/// Build endpoint with given region.
- fn build_endpoint(&self, region: &str) -> String {
+ fn build_endpoint(config: &S3Config, region: &str) -> String {
let bucket = {
- debug_assert!(self.is_bucket_valid(), "bucket must be valid");
+ debug_assert!(Self::is_bucket_valid(config), "bucket must be
valid");
- self.config.bucket.as_str()
+ config.bucket.as_str()
};
- let mut endpoint = match &self.config.endpoint {
+ let mut endpoint = match &config.endpoint {
Some(endpoint) => {
if endpoint.starts_with("http") {
endpoint.to_string()
@@ -527,7 +528,7 @@ impl S3Builder {
};
// Apply virtual host style.
- if self.config.enable_virtual_host_style {
+ if config.enable_virtual_host_style {
endpoint = endpoint.replace("//", &format!("//{bucket}."))
} else {
write!(endpoint, "/{bucket}").expect("write into string must
succeed");
@@ -696,15 +697,22 @@ impl S3Builder {
impl Builder for S3Builder {
type Config = S3Config;
- fn build(mut self) -> Result<impl Access> {
+ fn build(self) -> Result<impl Access> {
debug!("backend build started: {:?}", &self);
- let root =
normalize_root(&self.config.root.clone().unwrap_or_default());
+ #[allow(deprecated)]
+ let S3Builder {
+ mut config,
+ http_client,
+ credential_providers,
+ } = self;
+
+ let root = normalize_root(&config.root.clone().unwrap_or_default());
debug!("backend use root {}", &root);
// Handle bucket name.
- let bucket = if self.is_bucket_valid() {
- Ok(&self.config.bucket)
+ let bucket = if Self::is_bucket_valid(&config) {
+ Ok(&config.bucket)
} else {
Err(
Error::new(ErrorKind::ConfigInvalid, "The bucket is
misconfigured")
@@ -713,14 +721,14 @@ impl Builder for S3Builder {
}?;
debug!("backend use bucket {}", &bucket);
- let default_storage_class = match &self.config.default_storage_class {
+ let default_storage_class = match &config.default_storage_class {
None => None,
Some(v) => Some(
build_header_value(v).map_err(|err| err.with_context("key",
"storage_class"))?,
),
};
- let server_side_encryption = match &self.config.server_side_encryption
{
+ let server_side_encryption = match &config.server_side_encryption {
None => None,
Some(v) => Some(
build_header_value(v)
@@ -729,7 +737,7 @@ impl Builder for S3Builder {
};
let server_side_encryption_aws_kms_key_id =
- match &self.config.server_side_encryption_aws_kms_key_id {
+ match &config.server_side_encryption_aws_kms_key_id {
None => None,
Some(v) => Some(build_header_value(v).map_err(|err| {
err.with_context("key",
"server_side_encryption_aws_kms_key_id")
@@ -737,7 +745,7 @@ impl Builder for S3Builder {
};
let server_side_encryption_customer_algorithm =
- match &self.config.server_side_encryption_customer_algorithm {
+ match &config.server_side_encryption_customer_algorithm {
None => None,
Some(v) => Some(build_header_value(v).map_err(|err| {
err.with_context("key",
"server_side_encryption_customer_algorithm")
@@ -745,7 +753,7 @@ impl Builder for S3Builder {
};
let server_side_encryption_customer_key =
- match &self.config.server_side_encryption_customer_key {
+ match &config.server_side_encryption_customer_key {
None => None,
Some(v) => Some(build_header_value(v).map_err(|err| {
err.with_context("key",
"server_side_encryption_customer_key")
@@ -753,14 +761,14 @@ impl Builder for S3Builder {
};
let server_side_encryption_customer_key_md5 =
- match &self.config.server_side_encryption_customer_key_md5 {
+ match &config.server_side_encryption_customer_key_md5 {
None => None,
Some(v) => Some(build_header_value(v).map_err(|err| {
err.with_context("key",
"server_side_encryption_customer_key_md5")
})?),
};
- let checksum_algorithm = match
self.config.checksum_algorithm.as_deref() {
+ let checksum_algorithm = match config.checksum_algorithm.as_deref() {
Some("crc32c") => Some(ChecksumAlgorithm::Crc32c),
Some("md5") => Some(ChecksumAlgorithm::Md5),
None => None,
@@ -772,109 +780,102 @@ impl Builder for S3Builder {
}
};
- // This is our current config.
- let mut cfg = AwsConfig::default();
- if !self.config.disable_config_load {
- #[cfg(not(target_arch = "wasm32"))]
- {
- cfg = cfg.from_profile();
- cfg = cfg.from_env();
- }
- }
-
- if let Some(ref v) = self.config.region {
- cfg.region = Some(v.to_string());
- }
-
- if cfg.region.is_none() {
- return Err(Error::new(
- ErrorKind::ConfigInvalid,
- "region is missing. Please find it by S3::detect_region() or
set them in env.",
- )
- .with_operation("Builder::build")
- .with_context("service", S3_SCHEME));
- }
-
- let region = cfg.region.to_owned().unwrap();
+ // Determine the region
+ let region = if let Some(ref v) = config.region {
+ v.to_string()
+ } else {
+ std::env::var("AWS_REGION")
+ .or_else(|_| std::env::var("AWS_DEFAULT_REGION"))
+ .map_err(|_| {
+ Error::new(
+ ErrorKind::ConfigInvalid,
+ "region is missing. Please find it by
S3::detect_region() or set them in env.",
+ )
+ .with_operation("Builder::build")
+ .with_context("service", S3_SCHEME)
+ })?
+ };
debug!("backend use region: {region}");
- // Retain the user's endpoint if it exists; otherwise, try loading it
from the environment.
- self.config.endpoint = self.config.endpoint.or_else(||
cfg.endpoint_url.clone());
+ if config.endpoint.is_none() && !config.disable_config_load {
+ let endpoint_from_env = std::env::var("AWS_ENDPOINT_URL")
+ .or_else(|_| std::env::var("AWS_ENDPOINT"))
+ .or_else(|_| std::env::var("AWS_S3_ENDPOINT"))
+ .ok();
+ if let Some(endpoint) = endpoint_from_env {
+ let normalized = endpoint.trim_end_matches('/').to_string();
+ config.endpoint = Some(normalized);
+ }
+ }
// Building endpoint.
- let endpoint = self.build_endpoint(®ion);
+ let endpoint = Self::build_endpoint(&config, ®ion);
debug!("backend use endpoint: {endpoint}");
- // Setting all value from user input if available.
- if let Some(v) = self.config.access_key_id {
- cfg.access_key_id = Some(v)
- }
- if let Some(v) = self.config.secret_access_key {
- cfg.secret_access_key = Some(v)
- }
- if let Some(v) = self.config.session_token {
- cfg.session_token = Some(v)
- }
+ // Create the context for reqsign-core
+ let ctx = Context::new()
+ .with_file_read(TokioFileRead)
+
.with_http_send(ReqwestHttpSend::new(GLOBAL_REQWEST_CLIENT.clone()))
+ .with_env(OsEnv);
- let mut loader: Option<Box<dyn AwsCredentialLoad>> = None;
- // If customized_credential_load is set, we will use it.
- if let Some(v) = self.customized_credential_load {
- loader = Some(v);
- }
+ let mut provider = {
+ let mut builder = DefaultCredentialProvider::builder();
- // If role_arn is set, we must use AssumeRoleLoad.
- if let Some(role_arn) = self.config.role_arn {
- // use current env as source credential loader.
- let default_loader =
- AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(),
cfg.clone());
-
- // Build the config for assume role.
- let mut assume_role_cfg = AwsConfig {
- region: Some(region.clone()),
- role_arn: Some(role_arn),
- external_id: self.config.external_id.clone(),
- sts_regional_endpoints: "regional".to_string(),
- ..Default::default()
- };
+ if config.disable_config_load {
+ builder = builder.disable_env(true).disable_profile(true);
+ }
- // override default role_session_name if set
- if let Some(name) = self.config.role_session_name {
- assume_role_cfg.role_session_name = name;
+ if config.disable_ec2_metadata {
+ builder = builder.disable_imds(true);
}
- let assume_role_loader = AwsAssumeRoleLoader::new(
- GLOBAL_REQWEST_CLIENT.clone().clone(),
- assume_role_cfg,
- Box::new(default_loader),
- )
- .map_err(|err| {
- Error::new(
- ErrorKind::ConfigInvalid,
- "The assume_role_loader is misconfigured",
- )
- .with_context("service", S3_SCHEME)
- .set_source(err)
- })?;
- loader = Some(Box::new(assume_role_loader));
+ ProvideCredentialChain::new().push(builder.build())
+ };
+
+ // Insert static key if user provided.
+ if let (Some(ak), Some(sk)) = (&config.access_key_id,
&config.secret_access_key) {
+ let static_provider = if let Some(token) =
config.session_token.as_deref() {
+ StaticCredentialProvider::new(ak, sk).with_session_token(token)
+ } else {
+ StaticCredentialProvider::new(ak, sk)
+ };
+ provider = provider.push_front(static_provider);
}
- // If loader is not set, we will use default loader.
- let loader = match loader {
- Some(v) => v,
- None => {
- let mut default_loader =
-
AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg);
- if self.config.disable_ec2_metadata {
- default_loader =
default_loader.with_disable_ec2_metadata();
- }
- Box::new(default_loader)
+ // Insert assume role provider if user provided.
+ if let Some(role_arn) = &config.role_arn {
+ let sts_ctx = ctx.clone();
+ let sts_request_signer = AwsV4Signer::new("sts", ®ion);
+ let sts_signer = Signer::new(sts_ctx, provider,
sts_request_signer);
+ let mut assume_role_provider =
+ AssumeRoleCredentialProvider::new(role_arn.clone(), sts_signer)
+ .with_region(region.clone())
+ .with_regional_sts_endpoint();
+
+ if let Some(external_id) = &config.external_id {
+ assume_role_provider =
assume_role_provider.with_external_id(external_id.clone());
+ }
+ if let Some(role_session_name) = &config.role_session_name {
+ assume_role_provider =
+
assume_role_provider.with_role_session_name(role_session_name.clone());
}
+ provider =
ProvideCredentialChain::new().push(assume_role_provider);
+ }
+
+ // Replace provider if user provide their own.
+ let provider = if let Some(credential_providers) =
credential_providers {
+ credential_providers
+ } else {
+ provider
};
- let signer = AwsV4Signer::new("s3", ®ion);
+ // Create request signer for S3
+ let request_signer = AwsV4Signer::new("s3", ®ion);
+
+ // Create the signer
+ let signer = Signer::new(ctx, provider, request_signer);
- let delete_max_size = self
- .config
+ let delete_max_size = config
.delete_max_size
.unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
@@ -891,16 +892,11 @@ impl Builder for S3Builder {
stat_with_if_none_match: true,
stat_with_if_modified_since: true,
stat_with_if_unmodified_since: true,
- stat_with_override_cache_control: !self
- .config
- .disable_stat_with_override,
- stat_with_override_content_disposition: !self
- .config
- .disable_stat_with_override,
- stat_with_override_content_type: !self
- .config
+ stat_with_override_cache_control:
!config.disable_stat_with_override,
+ stat_with_override_content_disposition: !config
.disable_stat_with_override,
- stat_with_version: self.config.enable_versioning,
+ stat_with_override_content_type:
!config.disable_stat_with_override,
+ stat_with_version: config.enable_versioning,
read: true,
read_with_if_match: true,
@@ -910,17 +906,17 @@ impl Builder for S3Builder {
read_with_override_cache_control: true,
read_with_override_content_disposition: true,
read_with_override_content_type: true,
- read_with_version: self.config.enable_versioning,
+ read_with_version: config.enable_versioning,
write: true,
write_can_empty: true,
write_can_multi: true,
- write_can_append:
self.config.enable_write_with_append,
+ write_can_append: config.enable_write_with_append,
write_with_cache_control: true,
write_with_content_type: true,
write_with_content_encoding: true,
- write_with_if_match:
!self.config.disable_write_with_if_match,
+ write_with_if_match:
!config.disable_write_with_if_match,
write_with_if_not_exists: true,
write_with_user_metadata: true,
@@ -939,7 +935,7 @@ impl Builder for S3Builder {
delete: true,
delete_max_size: Some(delete_max_size),
- delete_with_version: self.config.enable_versioning,
+ delete_with_version: config.enable_versioning,
copy: true,
@@ -947,8 +943,8 @@ impl Builder for S3Builder {
list_with_limit: true,
list_with_start_after: true,
list_with_recursive: true,
- list_with_versions: self.config.enable_versioning,
- list_with_deleted: self.config.enable_versioning,
+ list_with_versions: config.enable_versioning,
+ list_with_deleted: config.enable_versioning,
presign: true,
presign_stat: true,
@@ -962,7 +958,7 @@ impl Builder for S3Builder {
// allow deprecated api here for compatibility
#[allow(deprecated)]
- if let Some(client) = self.http_client {
+ if let Some(client) = http_client {
am.update_http_client(|_| client);
}
@@ -977,12 +973,10 @@ impl Builder for S3Builder {
server_side_encryption_customer_key,
server_side_encryption_customer_key_md5,
default_storage_class,
- allow_anonymous: self.config.allow_anonymous,
- disable_list_objects_v2: self.config.disable_list_objects_v2,
- enable_request_payer: self.config.enable_request_payer,
+ allow_anonymous: config.allow_anonymous,
+ disable_list_objects_v2: config.disable_list_objects_v2,
+ enable_request_payer: config.enable_request_payer,
signer,
- loader,
- credential_loaded: AtomicBool::new(false),
checksum_algorithm,
}),
})
@@ -1122,9 +1116,9 @@ impl Access for S3Backend {
"operation is not supported",
)),
};
- let mut req = req?;
+ let req = req?;
- self.core.sign_query(&mut req, expire).await?;
+ let req = self.core.sign_query(req, expire).await?;
// We don't need this request anymore, consume it directly.
let (parts, _) = req.into_parts();
@@ -1136,3 +1130,109 @@ impl Access for S3Backend {
)))
}
}
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_is_valid_bucket() {
+ let bucket_cases = vec![
+ ("", false, false),
+ ("test", false, true),
+ ("test.xyz", false, true),
+ ("", true, false),
+ ("test", true, true),
+ ("test.xyz", true, false),
+ ];
+
+ for (bucket, enable_virtual_host_style, expected) in bucket_cases {
+ let mut b = S3Builder::default();
+ b = b.bucket(bucket);
+ if enable_virtual_host_style {
+ b = b.enable_virtual_host_style();
+ }
+ assert_eq!(S3Builder::is_bucket_valid(&b.config), expected)
+ }
+ }
+
+ #[test]
+ fn test_build_endpoint() {
+ let _ = tracing_subscriber::fmt().with_test_writer().try_init();
+
+ let endpoint_cases = vec![
+ Some("s3.amazonaws.com"),
+ Some("https://s3.amazonaws.com"),
+ Some("https://s3.us-east-2.amazonaws.com"),
+ None,
+ ];
+
+ for endpoint in &endpoint_cases {
+ let mut b = S3Builder::default().bucket("test");
+ if let Some(endpoint) = endpoint {
+ b = b.endpoint(endpoint);
+ }
+
+ let endpoint = S3Builder::build_endpoint(&b.config, "us-east-2");
+ assert_eq!(endpoint, "https://s3.us-east-2.amazonaws.com/test");
+ }
+
+ for endpoint in &endpoint_cases {
+ let mut b = S3Builder::default()
+ .bucket("test")
+ .enable_virtual_host_style();
+ if let Some(endpoint) = endpoint {
+ b = b.endpoint(endpoint);
+ }
+
+ let endpoint = S3Builder::build_endpoint(&b.config, "us-east-2");
+ assert_eq!(endpoint, "https://test.s3.us-east-2.amazonaws.com");
+ }
+ }
+
+ #[tokio::test]
+ async fn test_detect_region() {
+ let cases = vec![
+ (
+ "aws s3 without region in endpoint",
+ "https://s3.amazonaws.com",
+ "example",
+ Some("us-east-1"),
+ ),
+ (
+ "aws s3 with region in endpoint",
+ "https://s3.us-east-1.amazonaws.com",
+ "example",
+ Some("us-east-1"),
+ ),
+ (
+ "oss with public endpoint",
+ "https://oss-ap-southeast-1.aliyuncs.com",
+ "example",
+ Some("oss-ap-southeast-1"),
+ ),
+ (
+ "oss with internal endpoint",
+ "https://oss-cn-hangzhou-internal.aliyuncs.com",
+ "example",
+ Some("oss-cn-hangzhou-internal"),
+ ),
+ (
+ "r2",
+ "https://abc.xxxxx.r2.cloudflarestorage.com",
+ "example",
+ Some("auto"),
+ ),
+ (
+ "invalid service",
+ "https://opendal.apache.org",
+ "example",
+ None,
+ ),
+ ];
+
+ for (name, endpoint, bucket, expected) in cases {
+ let region = S3Builder::detect_region(endpoint, bucket).await;
+ assert_eq!(region.as_deref(), expected, "{name}");
+ }
+ }
+}
diff --git a/core/src/services/s3/config.rs b/core/src/services/s3/config.rs
index ee299ade9..43342e677 100644
--- a/core/src/services/s3/config.rs
+++ b/core/src/services/s3/config.rs
@@ -250,9 +250,8 @@ impl crate::Configurator for S3Config {
fn into_builder(self) -> Self::Builder {
S3Builder {
config: self,
- customized_credential_load: None,
-
http_client: None,
+ credential_providers: None,
}
}
}
diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs
index 4d3f14bd2..28576e4cd 100644
--- a/core/src/services/s3/core.rs
+++ b/core/src/services/s3/core.rs
@@ -19,8 +19,6 @@ use std::fmt::Debug;
use std::fmt::Display;
use std::fmt::Write;
use std::sync::Arc;
-use std::sync::atomic;
-use std::sync::atomic::AtomicBool;
use std::time::Duration;
use base64::Engine;
@@ -41,9 +39,8 @@ use http::header::IF_MATCH;
use http::header::IF_MODIFIED_SINCE;
use http::header::IF_NONE_MATCH;
use http::header::IF_UNMODIFIED_SINCE;
-use reqsign::AwsCredential;
-use reqsign::AwsCredentialLoad;
-use reqsign::AwsV4Signer;
+use reqsign_aws_v4::Credential;
+use reqsign_core::Signer;
use serde::Deserialize;
use serde::Serialize;
@@ -102,9 +99,7 @@ pub struct S3Core {
pub disable_list_objects_v2: bool,
pub enable_request_payer: bool,
- pub signer: AwsV4Signer,
- pub loader: Box<dyn AwsCredentialLoad>,
- pub credential_loaded: AtomicBool,
+ pub signer: Signer<Credential>,
pub checksum_algorithm: Option<ChecksumAlgorithm>,
}
@@ -119,52 +114,43 @@ impl Debug for S3Core {
}
impl S3Core {
- /// If credential is not found, we will not sign the request.
- async fn load_credential(&self) -> Result<Option<AwsCredential>> {
- let cred = self
- .loader
- .load_credential(GLOBAL_REQWEST_CLIENT.clone())
+ pub async fn sign_query<T>(&self, req: Request<T>, duration: Duration) ->
Result<Request<T>> {
+ // Skip signing for anonymous access
+ if self.allow_anonymous {
+ return Ok(req);
+ }
+
+ // Sign the request with presigned URL
+ let (mut parts, body) = req.into_parts();
+
+ self.signer
+ .sign(&mut parts, Some(duration))
.await
- .map_err(new_request_credential_error)?;
+ .map_err(|e| new_request_sign_error(e.into()))?;
- if let Some(cred) = cred {
- // Update credential_loaded to true if we have load credential
successfully.
- self.credential_loaded
- .store(true, atomic::Ordering::Relaxed);
- return Ok(Some(cred));
- }
+ // Always remove host header, let users' client to set it based on HTTP
+ // version.
+ //
+ // As discussed in
<https://github.com/seanmonstar/reqwest/issues/1809>,
+ // google server could send RST_STREAM of PROTOCOL_ERROR if our request
+ // contains host header.
+ parts.headers.remove(HOST);
- // If we have load credential before but failed to load this time, we
should
- // return error instead.
- if self.credential_loaded.load(atomic::Ordering::Relaxed) {
- return Err(Error::new(
- ErrorKind::PermissionDenied,
- "credential was previously loaded successfully but has failed
this time",
- )
- .set_temporary());
- }
+ Ok(Request::from_parts(parts, body))
+ }
- // Credential is empty and users allow anonymous access, we will not
sign the request.
+ pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>>
{
+ // Skip signing for anonymous access
if self.allow_anonymous {
- return Ok(None);
+ return self.info.http_client().send(req).await;
}
- Err(Error::new(
- ErrorKind::PermissionDenied,
- "no valid credential found and anonymous access is not allowed",
- ))
- }
-
- pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
- let cred = if let Some(cred) = self.load_credential().await? {
- cred
- } else {
- return Ok(());
- };
+ let (mut parts, body) = req.into_parts();
self.signer
- .sign(req, &cred)
- .map_err(new_request_sign_error)?;
+ .sign(&mut parts, None)
+ .await
+ .map_err(|e| new_request_sign_error(e.into()))?;
// Always remove host header, let users' client to set it based on HTTP
// version.
@@ -172,21 +158,26 @@ impl S3Core {
// As discussed in
<https://github.com/seanmonstar/reqwest/issues/1809>,
// google server could send RST_STREAM of PROTOCOL_ERROR if our request
// contains host header.
- req.headers_mut().remove(HOST);
+ parts.headers.remove(HOST);
- Ok(())
+ self.info
+ .http_client()
+ .send(Request::from_parts(parts, body))
+ .await
}
- pub async fn sign_query<T>(&self, req: &mut Request<T>, duration:
Duration) -> Result<()> {
- let cred = if let Some(cred) = self.load_credential().await? {
- cred
- } else {
- return Ok(());
- };
+ pub async fn fetch(&self, req: Request<Buffer>) ->
Result<Response<HttpBody>> {
+ // Skip signing for anonymous access
+ if self.allow_anonymous {
+ return self.info.http_client().fetch(req).await;
+ }
+
+ let (mut parts, body) = req.into_parts();
self.signer
- .sign_query(req, duration, &cred)
- .map_err(new_request_sign_error)?;
+ .sign(&mut parts, None)
+ .await
+ .map_err(|e| new_request_sign_error(e.into()))?;
// Always remove host header, let users' client to set it based on HTTP
// version.
@@ -194,14 +185,12 @@ impl S3Core {
// As discussed in
<https://github.com/seanmonstar/reqwest/issues/1809>,
// google server could send RST_STREAM of PROTOCOL_ERROR if our request
// contains host header.
- req.headers_mut().remove(HOST);
+ parts.headers.remove(HOST);
- Ok(())
- }
-
- #[inline]
- pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>>
{
- self.info.http_client().send(req).await
+ self.info
+ .http_client()
+ .fetch(Request::from_parts(parts, body))
+ .await
}
/// # Note
@@ -516,11 +505,8 @@ impl S3Core {
range: BytesRange,
args: &OpRead,
) -> Result<Response<HttpBody>> {
- let mut req = self.s3_get_object_request(path, range, args)?;
-
- self.sign(&mut req).await?;
-
- self.info.http_client().fetch(req).await
+ let req = self.s3_get_object_request(path, range, args)?;
+ self.fetch(req).await
}
pub fn s3_put_object_request(
@@ -603,10 +589,7 @@ impl S3Core {
}
pub async fn s3_head_object(&self, path: &str, args: OpStat) ->
Result<Response<Buffer>> {
- let mut req = self.s3_head_object_request(path, args)?;
-
- self.sign(&mut req).await?;
-
+ let req = self.s3_head_object_request(path, args)?;
self.send(req).await
}
@@ -634,14 +617,12 @@ impl S3Core {
// Set request payer header if enabled.
req = self.insert_request_payer_header(req);
- let mut req = req
+ let req = req
// Inject operation to the request.
.extension(Operation::Delete)
.body(Buffer::new())
.map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
-
self.send(req).await
}
@@ -696,15 +677,13 @@ impl S3Core {
// Set request payer header if enabled.
req = self.insert_request_payer_header(req);
- let mut req = req
+ let req = req
// Inject operation to the request.
.extension(Operation::Copy)
.header(constants::X_AMZ_COPY_SOURCE, &source)
.body(Buffer::new())
.map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
-
self.send(req).await
}
@@ -737,14 +716,12 @@ impl S3Core {
// Set request payer header if enabled.
req = self.insert_request_payer_header(req);
- let mut req = req
+ let req = req
// Inject operation to the request.
.extension(Operation::List)
.body(Buffer::new())
.map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
-
self.send(req).await
}
@@ -789,14 +766,12 @@ impl S3Core {
// Set request payer header if enabled.
req = self.insert_request_payer_header(req);
- let mut req = req
+ let req = req
// Inject operation to the request.
.extension(Operation::List)
.body(Buffer::new())
.map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
-
self.send(req).await
}
@@ -847,9 +822,7 @@ impl S3Core {
// Inject operation to the request.
req = req.extension(Operation::Write);
- let mut req =
req.body(Buffer::new()).map_err(new_request_build_error)?;
-
- self.sign(&mut req).await?;
+ let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
self.send(req).await
}
@@ -930,12 +903,10 @@ impl S3Core {
// Inject operation to the request.
req = req.extension(Operation::Write);
- let mut req = req
+ let req = req
.body(Buffer::from(Bytes::from(content)))
.map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
-
self.send(req).await
}
@@ -959,13 +930,12 @@ impl S3Core {
// Set request payer header if enabled.
req = self.insert_request_payer_header(req);
- let mut req = req
+ let req = req
// Inject operation to the request.
.extension(Operation::Write)
.body(Buffer::new())
.map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
self.send(req).await
}
@@ -1001,12 +971,10 @@ impl S3Core {
// Inject operation to the request.
req = req.extension(Operation::Delete);
- let mut req = req
+ let req = req
.body(Buffer::from(Bytes::from(content)))
.map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
-
self.send(req).await
}
@@ -1050,14 +1018,12 @@ impl S3Core {
// Set request payer header if enabled.
req = self.insert_request_payer_header(req);
- let mut req = req
+ let req = req
// Inject operation to the request.
.extension(Operation::List)
.body(Buffer::new())
.map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
-
self.send(req).await
}
}
diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs
index c88ed61ea..bd73388fb 100644
--- a/core/src/services/s3/writer.rs
+++ b/core/src/services/s3/writer.rs
@@ -66,12 +66,10 @@ impl S3Writer {
impl oio::MultipartWrite for S3Writer {
async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
- let mut req = self
+ let req = self
.core
.s3_put_object_request(&self.path, Some(size), &self.op, body)?;
- self.core.sign(&mut req).await?;
-
let resp = self.core.send(req).await?;
let status = resp.status();
@@ -117,7 +115,7 @@ impl oio::MultipartWrite for S3Writer {
let checksum = self.core.calculate_checksum(&body);
- let mut req = self.core.s3_upload_part_request(
+ let req = self.core.s3_upload_part_request(
&self.path,
upload_id,
part_number,
@@ -126,8 +124,6 @@ impl oio::MultipartWrite for S3Writer {
checksum.clone(),
)?;
- self.core.sign(&mut req).await?;
-
let resp = self.core.send(req).await?;
let status = resp.status();
@@ -247,12 +243,10 @@ impl oio::AppendWrite for S3Writer {
}
async fn append(&self, offset: u64, size: u64, body: Buffer) ->
Result<Metadata> {
- let mut req = self
+ let req = self
.core
.s3_append_object_request(&self.path, offset, size, &self.op,
body)?;
- self.core.sign(&mut req).await?;
-
let resp = self.core.send(req).await?;
let status = resp.status();