This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch fix-azblob in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 108f62f120067392598896539408805a81aaf77a Author: Xuanwo <[email protected]> AuthorDate: Thu Jun 1 01:31:53 2023 +0800 fix(services/azblob): Fix batch delete doesn't work on azure Signed-off-by: Xuanwo <[email protected]> --- .github/workflows/service_test_azblob.yml | 29 ++++++++++++++++++++++++++ Cargo.lock | 4 ++-- core/Cargo.toml | 4 ++-- core/src/raw/http_util/multipart.rs | 34 ++++++++++++++++++++++++++----- core/src/services/azblob/backend.rs | 2 -- core/src/services/azblob/core.rs | 19 ++++++++++++----- core/tests/behavior/write.rs | 16 +++++++++++++++ 7 files changed, 92 insertions(+), 16 deletions(-) diff --git a/.github/workflows/service_test_azblob.yml b/.github/workflows/service_test_azblob.yml index 969895c39..999c57ca0 100644 --- a/.github/workflows/service_test_azblob.yml +++ b/.github/workflows/service_test_azblob.yml @@ -70,3 +70,32 @@ jobs: OPENDAL_AZBLOB_ENDPOINT: "http://127.0.0.1:10000/devstoreaccount1" OPENDAL_AZBLOB_ACCOUNT_NAME: devstoreaccount1 OPENDAL_AZBLOB_ACCOUNT_KEY: Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== + + azure_azblob: + runs-on: ubuntu-latest + if: github.event_name == 'push' || !github.event.pull_request.head.repo.fork + steps: + - uses: actions/checkout@v3 + - name: Setup Rust toolchain + uses: ./.github/actions/setup + + - name: Load secret + id: op-load-secret + uses: 1password/load-secrets-action@v1 + with: + export-env: true + env: + OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }} + OPENDAL_AZBLOB_TEST: op://services/azblob/test + OPENDAL_AZBLOB_CONTAINER: op://services/azblob/container + OPENDAL_AZBLOB_ENDPOINT: op://services/azblob/endpoint + OPENDAL_AZBLOB_ACCOUNT_NAME: op://services/azblob/account_name + OPENDAL_AZBLOB_ACCOUNT_KEY: op://services/azblob/account_key + + - name: Test + shell: bash + working-directory: core + run: cargo test azblob -- --show-output + env: + RUST_BACKTRACE: full + RUST_LOG: debug diff --git a/Cargo.lock b/Cargo.lock index 097003bb9..40ee02f79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3727,9 +3727,9 @@ checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848" [[package]] name = "reqsign" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b04f5fccb94d61c154f0d8520ec42e79afdc145f4b1a392faa269874995fda66" +checksum = "b6cb65eb3405f9c2de5c18bfc37338d6bbdb2c35eb8eb0e946208cbb564e4833" dependencies = [ "anyhow", "async-trait", diff --git a/core/Cargo.toml b/core/Cargo.toml index 249707fed..a2e1cd3b6 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -203,11 +203,10 @@ redis = { version = "0.22", features = [ "tokio-comp", "connection-manager", ], optional = true } -reqsign = { version = "0.12.0", default-features = false, optional = true } +reqsign = { version = "0.13.0", default-features = false, optional = true } reqwest = { version = "0.11.13", features = [ "stream", ], default-features = false } -url = { version = "2.2" } # version should follow reqwest rocksdb = { version = "0.21.0", default-features = false, optional = true } serde = { version = "1", features = ["derive"] } serde_json = "1" @@ -219,6 +218,7 @@ suppaftp = { version = "4.5", default-features = false, features = [ ], optional = true } tokio = "1.27" tracing = { version = "0.1", optional = true } +url = { version = "2.2" } # version should follow reqwest uuid = { version = "1", features = ["serde", "v4"] } [dev-dependencies] diff --git a/core/src/raw/http_util/multipart.rs b/core/src/raw/http_util/multipart.rs index 6be885bb6..c30584f60 100644 --- a/core/src/raw/http_util/multipart.rs +++ b/core/src/raw/http_util/multipart.rs @@ -22,6 +22,7 @@ use bytes::BytesMut; use http::header::CONTENT_DISPOSITION; use http::header::CONTENT_LENGTH; use http::header::CONTENT_TYPE; +use http::uri::PathAndQuery; use http::HeaderMap; use http::HeaderName; use http::HeaderValue; @@ -51,7 +52,7 @@ impl<T: Part> Multipart<T> { /// Create a new multipart with random boundary. pub fn new() -> Self { Multipart { - boundary: uuid::Uuid::new_v4().to_string(), + boundary: format!("opendal-{}", uuid::Uuid::new_v4()), parts: Vec::default(), } } @@ -236,9 +237,14 @@ impl MixedPart { Self { part_headers, method: parts.method, - // TODO: Maybe we should support query too?; - uri: Uri::from_str(parts.uri.path()) - .expect("the uri used to build a mixed part must be valid"), + uri: Uri::from_str( + parts + .uri + .path_and_query() + .unwrap_or(&PathAndQuery::from_static("/")) + .as_str(), + ) + .expect("the uri used to build a mixed part must be valid"), version: parts.version, headers: parts.headers, content, @@ -284,7 +290,25 @@ impl Part for MixedPart { // Write parts headers. for (k, v) in self.part_headers.iter() { - bs.extend_from_slice(k.as_str().as_bytes()); + // Trick! + // + // Azblob could not recognize header names like `content-type` + // and requires to use `Content-Type`. So we hardcode the part + // headers name here. + match k.as_str() { + "content-type" => { + bs.extend_from_slice("Content-Type".as_bytes()); + } + "content-id" => { + bs.extend_from_slice("Content-ID".as_bytes()); + } + "content-transfer-encoding" => { + bs.extend_from_slice("Content-Transfer-Encoding".as_bytes()); + } + _ => { + bs.extend_from_slice(k.as_str().as_bytes()); + } + } bs.extend_from_slice(b": "); bs.extend_from_slice(v.as_bytes()); bs.extend_from_slice(b"\r\n"); diff --git a/core/src/services/azblob/backend.rs b/core/src/services/azblob/backend.rs index 6b52b535a..eb2298d22 100644 --- a/core/src/services/azblob/backend.rs +++ b/core/src/services/azblob/backend.rs @@ -440,7 +440,6 @@ impl Builder for AzblobBuilder { let cred_loader = AzureStorageLoader::new(config_loader); let signer = AzureStorageSigner::new(); - let batch_signer = AzureStorageSigner::new().omit_service_version(); debug!("backend build finished: {:?}", &self); Ok(AzblobBackend { @@ -455,7 +454,6 @@ impl Builder for AzblobBuilder { client, loader: cred_loader, signer, - batch_signer, }), has_sas_token: self.sas_token.is_some(), }) diff --git a/core/src/services/azblob/core.rs b/core/src/services/azblob/core.rs index f9f09b6a5..acf805d3b 100644 --- a/core/src/services/azblob/core.rs +++ b/core/src/services/azblob/core.rs @@ -37,6 +37,8 @@ use crate::raw::*; use crate::*; mod constants { + pub const X_MS_VERSION: &str = "x-ms-version"; + pub const X_MS_BLOB_TYPE: &str = "x-ms-blob-type"; pub const X_MS_COPY_SOURCE: &str = "x-ms-copy-source"; pub const X_MS_BLOB_CACHE_CONTROL: &str = "x-ms-blob-cache-control"; @@ -58,7 +60,6 @@ pub struct AzblobCore { pub client: HttpClient, pub loader: AzureStorageLoader, pub signer: AzureStorageSigner, - pub batch_signer: AzureStorageSigner, } impl Debug for AzblobCore { @@ -99,14 +100,22 @@ impl AzblobCore { pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> { let cred = self.load_credential().await?; + // Insert x-ms-version header for normal requests. + req.headers_mut().insert( + HeaderName::from_static(constants::X_MS_VERSION), + // 2022-11-02 is the version supported by Azurite V3 and + // used by Azure Portal, We use this version to make + // sure most our developer happy. + // + // In the future, we could allow users to configure this value. + HeaderValue::from_static("2022-11-02"), + ); self.signer.sign(req, &cred).map_err(new_request_sign_error) } async fn batch_sign<T>(&self, req: &mut Request<T>) -> Result<()> { let cred = self.load_credential().await?; - self.batch_signer - .sign(req, &cred) - .map_err(new_request_sign_error) + self.signer.sign(req, &cred).map_err(new_request_sign_error) } #[inline] @@ -537,8 +546,8 @@ impl AzblobCore { for (idx, path) in paths.iter().enumerate() { let mut req = self.azblob_delete_blob_request(path)?; - self.batch_sign(&mut req).await?; + multipart = multipart.part( MixedPart::from_request(req).part_header("content-id".parse().unwrap(), idx.into()), ); diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs index c4f747069..2efe228d5 100644 --- a/core/tests/behavior/write.rs +++ b/core/tests/behavior/write.rs @@ -107,6 +107,7 @@ macro_rules! behavior_write_tests { test_delete_with_special_chars, test_delete_not_existing, test_delete_stream, + test_remove_one_file, test_writer_write, test_writer_abort, test_writer_futures_copy, @@ -1037,6 +1038,21 @@ pub async fn test_delete_not_existing(op: Operator) -> Result<()> { Ok(()) } +/// Remove one file +pub async fn test_remove_one_file(op: Operator) -> Result<()> { + let path = uuid::Uuid::new_v4().to_string(); + let (content, _) = gen_bytes(); + + op.write(&path, content).await.expect("write must succeed"); + + op.remove(vec![path.clone()]).await?; + + // Stat it again to check. + assert!(!op.is_exist(&path).await?); + + Ok(()) +} + /// Delete via stream. pub async fn test_delete_stream(op: Operator) -> Result<()> { let dir = uuid::Uuid::new_v4().to_string();
