This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch fix-azblob
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 108f62f120067392598896539408805a81aaf77a
Author: Xuanwo <[email protected]>
AuthorDate: Thu Jun 1 01:31:53 2023 +0800

    fix(services/azblob): Fix batch delete doesn't work on azure
    
    Signed-off-by: Xuanwo <[email protected]>
---
 .github/workflows/service_test_azblob.yml | 29 ++++++++++++++++++++++++++
 Cargo.lock                                |  4 ++--
 core/Cargo.toml                           |  4 ++--
 core/src/raw/http_util/multipart.rs       | 34 ++++++++++++++++++++++++++-----
 core/src/services/azblob/backend.rs       |  2 --
 core/src/services/azblob/core.rs          | 19 ++++++++++++-----
 core/tests/behavior/write.rs              | 16 +++++++++++++++
 7 files changed, 92 insertions(+), 16 deletions(-)

diff --git a/.github/workflows/service_test_azblob.yml 
b/.github/workflows/service_test_azblob.yml
index 969895c39..999c57ca0 100644
--- a/.github/workflows/service_test_azblob.yml
+++ b/.github/workflows/service_test_azblob.yml
@@ -70,3 +70,32 @@ jobs:
           OPENDAL_AZBLOB_ENDPOINT: "http://127.0.0.1:10000/devstoreaccount1";
           OPENDAL_AZBLOB_ACCOUNT_NAME: devstoreaccount1
           OPENDAL_AZBLOB_ACCOUNT_KEY: 
Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==
+
+  azure_azblob:
+    runs-on: ubuntu-latest
+    if: github.event_name == 'push' || 
!github.event.pull_request.head.repo.fork
+    steps:
+      - uses: actions/checkout@v3
+      - name: Setup Rust toolchain
+        uses: ./.github/actions/setup
+
+      - name: Load secret
+        id: op-load-secret
+        uses: 1password/load-secrets-action@v1
+        with:
+          export-env: true
+        env:
+          OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }}
+          OPENDAL_AZBLOB_TEST: op://services/azblob/test
+          OPENDAL_AZBLOB_CONTAINER: op://services/azblob/container
+          OPENDAL_AZBLOB_ENDPOINT: op://services/azblob/endpoint
+          OPENDAL_AZBLOB_ACCOUNT_NAME: op://services/azblob/account_name
+          OPENDAL_AZBLOB_ACCOUNT_KEY: op://services/azblob/account_key
+
+      - name: Test
+        shell: bash
+        working-directory: core
+        run: cargo test azblob -- --show-output
+        env:
+          RUST_BACKTRACE: full
+          RUST_LOG: debug
diff --git a/Cargo.lock b/Cargo.lock
index 097003bb9..40ee02f79 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3727,9 +3727,9 @@ checksum = 
"456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848"
 
 [[package]]
 name = "reqsign"
-version = "0.12.0"
+version = "0.13.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "b04f5fccb94d61c154f0d8520ec42e79afdc145f4b1a392faa269874995fda66"
+checksum = "b6cb65eb3405f9c2de5c18bfc37338d6bbdb2c35eb8eb0e946208cbb564e4833"
 dependencies = [
  "anyhow",
  "async-trait",
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 249707fed..a2e1cd3b6 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -203,11 +203,10 @@ redis = { version = "0.22", features = [
   "tokio-comp",
   "connection-manager",
 ], optional = true }
-reqsign = { version = "0.12.0", default-features = false, optional = true }
+reqsign = { version = "0.13.0", default-features = false, optional = true }
 reqwest = { version = "0.11.13", features = [
   "stream",
 ], default-features = false }
-url = { version = "2.2" } # version should follow reqwest
 rocksdb = { version = "0.21.0", default-features = false, optional = true }
 serde = { version = "1", features = ["derive"] }
 serde_json = "1"
@@ -219,6 +218,7 @@ suppaftp = { version = "4.5", default-features = false, 
features = [
 ], optional = true }
 tokio = "1.27"
 tracing = { version = "0.1", optional = true }
+url = { version = "2.2" } # version should follow reqwest
 uuid = { version = "1", features = ["serde", "v4"] }
 
 [dev-dependencies]
diff --git a/core/src/raw/http_util/multipart.rs 
b/core/src/raw/http_util/multipart.rs
index 6be885bb6..c30584f60 100644
--- a/core/src/raw/http_util/multipart.rs
+++ b/core/src/raw/http_util/multipart.rs
@@ -22,6 +22,7 @@ use bytes::BytesMut;
 use http::header::CONTENT_DISPOSITION;
 use http::header::CONTENT_LENGTH;
 use http::header::CONTENT_TYPE;
+use http::uri::PathAndQuery;
 use http::HeaderMap;
 use http::HeaderName;
 use http::HeaderValue;
@@ -51,7 +52,7 @@ impl<T: Part> Multipart<T> {
     /// Create a new multipart with random boundary.
     pub fn new() -> Self {
         Multipart {
-            boundary: uuid::Uuid::new_v4().to_string(),
+            boundary: format!("opendal-{}", uuid::Uuid::new_v4()),
             parts: Vec::default(),
         }
     }
@@ -236,9 +237,14 @@ impl MixedPart {
         Self {
             part_headers,
             method: parts.method,
-            // TODO: Maybe we should support query too?;
-            uri: Uri::from_str(parts.uri.path())
-                .expect("the uri used to build a mixed part must be valid"),
+            uri: Uri::from_str(
+                parts
+                    .uri
+                    .path_and_query()
+                    .unwrap_or(&PathAndQuery::from_static("/"))
+                    .as_str(),
+            )
+            .expect("the uri used to build a mixed part must be valid"),
             version: parts.version,
             headers: parts.headers,
             content,
@@ -284,7 +290,25 @@ impl Part for MixedPart {
 
         // Write parts headers.
         for (k, v) in self.part_headers.iter() {
-            bs.extend_from_slice(k.as_str().as_bytes());
+            // Trick!
+            //
+            // Azblob could not recognize header names like `content-type`
+            // and requires to use `Content-Type`. So we hardcode the part
+            // headers name here.
+            match k.as_str() {
+                "content-type" => {
+                    bs.extend_from_slice("Content-Type".as_bytes());
+                }
+                "content-id" => {
+                    bs.extend_from_slice("Content-ID".as_bytes());
+                }
+                "content-transfer-encoding" => {
+                    
bs.extend_from_slice("Content-Transfer-Encoding".as_bytes());
+                }
+                _ => {
+                    bs.extend_from_slice(k.as_str().as_bytes());
+                }
+            }
             bs.extend_from_slice(b": ");
             bs.extend_from_slice(v.as_bytes());
             bs.extend_from_slice(b"\r\n");
diff --git a/core/src/services/azblob/backend.rs 
b/core/src/services/azblob/backend.rs
index 6b52b535a..eb2298d22 100644
--- a/core/src/services/azblob/backend.rs
+++ b/core/src/services/azblob/backend.rs
@@ -440,7 +440,6 @@ impl Builder for AzblobBuilder {
         let cred_loader = AzureStorageLoader::new(config_loader);
 
         let signer = AzureStorageSigner::new();
-        let batch_signer = AzureStorageSigner::new().omit_service_version();
 
         debug!("backend build finished: {:?}", &self);
         Ok(AzblobBackend {
@@ -455,7 +454,6 @@ impl Builder for AzblobBuilder {
                 client,
                 loader: cred_loader,
                 signer,
-                batch_signer,
             }),
             has_sas_token: self.sas_token.is_some(),
         })
diff --git a/core/src/services/azblob/core.rs b/core/src/services/azblob/core.rs
index f9f09b6a5..acf805d3b 100644
--- a/core/src/services/azblob/core.rs
+++ b/core/src/services/azblob/core.rs
@@ -37,6 +37,8 @@ use crate::raw::*;
 use crate::*;
 
 mod constants {
+    pub const X_MS_VERSION: &str = "x-ms-version";
+
     pub const X_MS_BLOB_TYPE: &str = "x-ms-blob-type";
     pub const X_MS_COPY_SOURCE: &str = "x-ms-copy-source";
     pub const X_MS_BLOB_CACHE_CONTROL: &str = "x-ms-blob-cache-control";
@@ -58,7 +60,6 @@ pub struct AzblobCore {
     pub client: HttpClient,
     pub loader: AzureStorageLoader,
     pub signer: AzureStorageSigner,
-    pub batch_signer: AzureStorageSigner,
 }
 
 impl Debug for AzblobCore {
@@ -99,14 +100,22 @@ impl AzblobCore {
 
     pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
         let cred = self.load_credential().await?;
+        // Insert x-ms-version header for normal requests.
+        req.headers_mut().insert(
+            HeaderName::from_static(constants::X_MS_VERSION),
+            // 2022-11-02 is the version supported by Azurite V3 and
+            // used by Azure Portal, We use this version to make
+            // sure most our developer happy.
+            //
+            // In the future, we could allow users to configure this value.
+            HeaderValue::from_static("2022-11-02"),
+        );
         self.signer.sign(req, &cred).map_err(new_request_sign_error)
     }
 
     async fn batch_sign<T>(&self, req: &mut Request<T>) -> Result<()> {
         let cred = self.load_credential().await?;
-        self.batch_signer
-            .sign(req, &cred)
-            .map_err(new_request_sign_error)
+        self.signer.sign(req, &cred).map_err(new_request_sign_error)
     }
 
     #[inline]
@@ -537,8 +546,8 @@ impl AzblobCore {
 
         for (idx, path) in paths.iter().enumerate() {
             let mut req = self.azblob_delete_blob_request(path)?;
-
             self.batch_sign(&mut req).await?;
+
             multipart = multipart.part(
                 
MixedPart::from_request(req).part_header("content-id".parse().unwrap(), 
idx.into()),
             );
diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs
index c4f747069..2efe228d5 100644
--- a/core/tests/behavior/write.rs
+++ b/core/tests/behavior/write.rs
@@ -107,6 +107,7 @@ macro_rules! behavior_write_tests {
                 test_delete_with_special_chars,
                 test_delete_not_existing,
                 test_delete_stream,
+                test_remove_one_file,
                 test_writer_write,
                 test_writer_abort,
                 test_writer_futures_copy,
@@ -1037,6 +1038,21 @@ pub async fn test_delete_not_existing(op: Operator) -> 
Result<()> {
     Ok(())
 }
 
+/// Remove one file
+pub async fn test_remove_one_file(op: Operator) -> Result<()> {
+    let path = uuid::Uuid::new_v4().to_string();
+    let (content, _) = gen_bytes();
+
+    op.write(&path, content).await.expect("write must succeed");
+
+    op.remove(vec![path.clone()]).await?;
+
+    // Stat it again to check.
+    assert!(!op.is_exist(&path).await?);
+
+    Ok(())
+}
+
 /// Delete via stream.
 pub async fn test_delete_stream(op: Operator) -> Result<()> {
     let dir = uuid::Uuid::new_v4().to_string();

Reply via email to