This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git


The following commit(s) were added to refs/heads/main by this push:
     new a9a83f1  feat: add option to disable bulk delete for aws (#734)
a9a83f1 is described below

commit a9a83f18071d9e4c5636d379aaa0513c1e06d0d4
Author: Hengfei Yang <[email protected]>
AuthorDate: Wed Jun 3 04:12:46 2026 +0800

    feat: add option to disable bulk delete for aws (#734)
    
    * feat: add option to disable bulk delete for aws
    
    * fix(aws): correct documentation references for delete methods
    
    * test: add more unit tests
---
 src/aws/builder.rs |  35 +++++++++++
 src/aws/client.rs  | 173 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 src/aws/mod.rs     |  19 ++++++
 3 files changed, 227 insertions(+)

diff --git a/src/aws/builder.rs b/src/aws/builder.rs
index 85dbaae..5a6d2b4 100644
--- a/src/aws/builder.rs
+++ b/src/aws/builder.rs
@@ -181,6 +181,8 @@ pub struct AmazonS3Builder {
     conditional_put: ConfigValue<S3ConditionalPut>,
     /// Ignore tags
     disable_tagging: ConfigValue<bool>,
+    /// Disable bulk delete
+    disable_bulk_delete: ConfigValue<bool>,
     /// Encryption (See [`S3EncryptionConfigKey`])
     encryption_type: Option<ConfigValue<S3EncryptionType>>,
     encryption_kms_key_id: Option<String>,
@@ -429,6 +431,19 @@ pub enum AmazonS3ConfigKey {
     /// - `disable_tagging`
     DisableTagging,
 
+    /// Disable bulk delete (`DeleteObjects`, `POST /?delete`)
+    ///
+    /// If set to `true`, [`delete`](crate::ObjectStoreExt::delete) and
+    /// [`delete_stream`](crate::ObjectStore::delete_stream) will issue
+    /// single-object `DELETE /key` requests instead of the bulk 
`DeleteObjects`
+    /// API (`POST /?delete`). Use this for S3-compatible providers that do not
+    /// implement `DeleteObjects` (e.g. Alibaba Cloud OSS).
+    ///
+    /// Supported keys:
+    /// - `aws_disable_bulk_delete`
+    /// - `disable_bulk_delete`
+    DisableBulkDelete,
+
     /// Enable Support for S3 Express One Zone
     ///
     /// Supported keys:
@@ -478,6 +493,7 @@ impl AsRef<str> for AmazonS3ConfigKey {
             Self::CopyIfNotExists => "aws_copy_if_not_exists",
             Self::ConditionalPut => "aws_conditional_put",
             Self::DisableTagging => "aws_disable_tagging",
+            Self::DisableBulkDelete => "aws_disable_bulk_delete",
             Self::RequestPayer => "aws_request_payer",
             Self::Client(opt) => opt.as_ref(),
             Self::Encryption(opt) => opt.as_ref(),
@@ -525,6 +541,7 @@ impl FromStr for AmazonS3ConfigKey {
             "aws_copy_if_not_exists" | "copy_if_not_exists" => 
Ok(Self::CopyIfNotExists),
             "aws_conditional_put" | "conditional_put" => 
Ok(Self::ConditionalPut),
             "aws_disable_tagging" | "disable_tagging" => 
Ok(Self::DisableTagging),
+            "aws_disable_bulk_delete" | "disable_bulk_delete" => 
Ok(Self::DisableBulkDelete),
             "aws_request_payer" | "request_payer" => Ok(Self::RequestPayer),
             // Backwards compatibility
             "aws_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
@@ -672,6 +689,7 @@ impl AmazonS3Builder {
             }
             AmazonS3ConfigKey::SkipSignature => 
self.skip_signature.parse(value),
             AmazonS3ConfigKey::DisableTagging => 
self.disable_tagging.parse(value),
+            AmazonS3ConfigKey::DisableBulkDelete => 
self.disable_bulk_delete.parse(value),
             AmazonS3ConfigKey::CopyIfNotExists => {
                 self.copy_if_not_exists = 
Some(ConfigValue::Deferred(value.into()))
             }
@@ -745,6 +763,7 @@ impl AmazonS3Builder {
             }
             AmazonS3ConfigKey::ConditionalPut => 
Some(self.conditional_put.to_string()),
             AmazonS3ConfigKey::DisableTagging => 
Some(self.disable_tagging.to_string()),
+            AmazonS3ConfigKey::DisableBulkDelete => 
Some(self.disable_bulk_delete.to_string()),
             AmazonS3ConfigKey::RequestPayer => 
Some(self.request_payer.to_string()),
             AmazonS3ConfigKey::Encryption(key) => match key {
                 S3EncryptionConfigKey::ServerSideEncryption => {
@@ -1018,6 +1037,21 @@ impl AmazonS3Builder {
         self
     }
 
+    /// If set to `true`, [`delete`](crate::ObjectStoreExt::delete) and
+    /// [`delete_stream`](crate::ObjectStore::delete_stream) will issue
+    /// single-object `DELETE /key` requests instead of the bulk 
`DeleteObjects`
+    /// API (`POST /?delete`).
+    ///
+    /// The bulk `DeleteObjects` API is more efficient but is not implemented 
by
+    /// all S3-compatible providers (e.g. Alibaba Cloud OSS). Setting this to
+    /// `true` restores the single-object delete behaviour that works against
+    /// every S3-compatible provider, at the cost of throughput when deleting
+    /// many objects via [`delete_stream`](crate::ObjectStore::delete_stream).
+    pub fn with_disable_bulk_delete(mut self, disable: bool) -> Self {
+        self.disable_bulk_delete = disable.into();
+        self
+    }
+
     /// Use SSE-KMS for server side encryption.
     pub fn with_sse_kms_encryption(mut self, kms_key_id: impl Into<String>) -> 
Self {
         self.encryption_type = 
Some(ConfigValue::Parsed(S3EncryptionType::SseKms));
@@ -1241,6 +1275,7 @@ impl AmazonS3Builder {
             sign_payload: !self.unsigned_payload.get()?,
             skip_signature: self.skip_signature.get()?,
             disable_tagging: self.disable_tagging.get()?,
+            disable_bulk_delete: self.disable_bulk_delete.get()?,
             checksum,
             copy_if_not_exists,
             conditional_put: self.conditional_put.get()?,
diff --git a/src/aws/client.rs b/src/aws/client.rs
index f579a89..199f4a8 100644
--- a/src/aws/client.rs
+++ b/src/aws/client.rs
@@ -205,6 +205,7 @@ pub(crate) struct S3Config {
     pub sign_payload: bool,
     pub skip_signature: bool,
     pub disable_tagging: bool,
+    pub disable_bulk_delete: bool,
     pub checksum: Option<Checksum>,
     pub copy_if_not_exists: Option<S3CopyIfNotExists>,
     pub conditional_put: S3ConditionalPut,
@@ -615,6 +616,16 @@ impl S3Client {
         Ok(results)
     }
 
+    /// Make a single-object S3 Delete request 
<https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html>
+    ///
+    /// Unlike [`bulk_delete_request`](Self::bulk_delete_request), this issues 
a
+    /// plain `DELETE /key` request, which is part of the core S3 API and is
+    /// supported by every S3-compatible provider.
+    pub(crate) async fn delete_request(&self, path: &Path) -> Result<()> {
+        self.request(Method::DELETE, path).send().await?;
+        Ok(())
+    }
+
     /// Make an S3 Copy request 
<https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html>
     pub(crate) fn copy_request<'a>(&'a self, from: &Path, to: &'a Path) -> 
Request<'a> {
         let source = format!("{}/{}", self.config.bucket, encode_path(from));
@@ -1008,10 +1019,13 @@ fn encode_path(path: &Path) -> PercentEncode<'_> {
 mod tests {
     use super::*;
     use crate::GetOptions;
+    use crate::ObjectStore;
+    use crate::aws::{AmazonS3, AmazonS3Builder};
     use crate::client::HttpClient;
     use crate::client::get::GetClient;
     use crate::client::mock_server::MockServer;
     use crate::client::retry::RetryContext;
+    use futures_util::{StreamExt, TryStreamExt};
     use http::Response;
     use http::header::{AUTHORIZATION, CONTENT_LENGTH};
     use hyper::Request;
@@ -1049,6 +1063,7 @@ mod tests {
             retry_config: Default::default(),
             sign_payload: false,
             disable_tagging: false,
+            disable_bulk_delete: false,
             checksum: None,
             copy_if_not_exists: None,
             conditional_put: Default::default(),
@@ -1104,6 +1119,7 @@ mod tests {
             retry_config: Default::default(),
             sign_payload: false,
             disable_tagging: false,
+            disable_bulk_delete: false,
             checksum: None,
             copy_if_not_exists: None,
             conditional_put: Default::default(),
@@ -1155,6 +1171,163 @@ mod tests {
         mock.shutdown().await;
     }
 
+    /// `(method, path, query)` captured for assertion outside the mock 
closure.
+    ///
+    /// `MockServer` swallows panics raised inside its response handler (the
+    /// connection just resets and the S3 retry logic can still surface an `Ok`
+    /// result), so assertions placed inside the closure are silently ignored.
+    /// We capture into shared state and assert in the test body instead.
+    type CapturedRequest = (Method, String, Option<String>);
+
+    fn capture(captured: &Arc<std::sync::Mutex<Vec<CapturedRequest>>>, req: 
&Request<Incoming>) {
+        captured.lock().unwrap().push((
+            req.method().clone(),
+            req.uri().path().to_string(),
+            req.uri().query().map(|s| s.to_string()),
+        ));
+    }
+
+    /// Build an `AmazonS3` via the public builder so that `bucket_endpoint`
+    /// is computed by the library from the addressing-style option — i.e.
+    /// the option under test actually drives the URL the client emits.
+    fn make_store(mock: &MockServer, virtual_hosted: bool, 
disable_bulk_delete: bool) -> AmazonS3 {
+        AmazonS3Builder::new()
+            .with_endpoint(mock.url())
+            .with_bucket_name("test-bucket")
+            .with_region("us-east-1")
+            .with_allow_http(true)
+            .with_skip_signature(true)
+            .with_virtual_hosted_style_request(virtual_hosted)
+            .with_disable_bulk_delete(disable_bulk_delete)
+            .build()
+            .unwrap()
+    }
+
+    #[tokio::test]
+    async fn test_delete_default() {
+        // Default: path-style + bulk delete enabled.
+        // `delete_stream` must issue a single `POST /{bucket}?delete`.
+        let mock = MockServer::new().await;
+        let captured: Arc<std::sync::Mutex<Vec<CapturedRequest>>> = 
Default::default();
+        let c = Arc::clone(&captured);
+        mock.push_fn(move |req| {
+            capture(&c, &req);
+            Response::builder()
+                .status(200)
+                
.body("<DeleteResult><Deleted><Key>foo</Key></Deleted></DeleteResult>".to_string())
+                .unwrap()
+        });
+
+        let store = make_store(&mock, false, false);
+        let locations = 
futures_util::stream::iter(vec![Ok(Path::from("foo"))]).boxed();
+        let deleted: Vec<_> = 
store.delete_stream(locations).try_collect().await.unwrap();
+        assert_eq!(deleted.len(), 1);
+
+        let captured = captured.lock().unwrap().clone();
+        assert_eq!(captured.len(), 1, "expected one bulk delete request");
+        assert_eq!(captured[0].0, Method::POST);
+        assert_eq!(captured[0].1, "/test-bucket");
+        assert_eq!(captured[0].2.as_deref(), Some("delete"));
+
+        mock.shutdown().await;
+    }
+
+    #[tokio::test]
+    async fn test_delete_default_with_disable_bulk() {
+        // Path-style + bulk delete disabled.
+        // `delete_stream` must fan out into `DELETE /{bucket}/{key}` (one per
+        // object, no `?delete` query).
+        let mock = MockServer::new().await;
+        let captured: Arc<std::sync::Mutex<Vec<CapturedRequest>>> = 
Default::default();
+        for _ in 0..2 {
+            let c = Arc::clone(&captured);
+            mock.push_fn(move |req| {
+                capture(&c, &req);
+                Response::builder().status(204).body(String::new()).unwrap()
+            });
+        }
+
+        let store = make_store(&mock, false, true);
+        let locations =
+            futures_util::stream::iter(vec![Ok(Path::from("foo")), 
Ok(Path::from("bar"))]).boxed();
+        let deleted: Vec<_> = 
store.delete_stream(locations).try_collect().await.unwrap();
+        assert_eq!(deleted.len(), 2);
+
+        let mut captured = captured.lock().unwrap().clone();
+        captured.sort_by(|a, b| a.1.cmp(&b.1));
+        assert_eq!(captured.len(), 2, "expected one DELETE per object");
+        assert_eq!(
+            captured[0],
+            (Method::DELETE, "/test-bucket/bar".to_string(), None)
+        );
+        assert_eq!(
+            captured[1],
+            (Method::DELETE, "/test-bucket/foo".to_string(), None)
+        );
+
+        mock.shutdown().await;
+    }
+
+    #[tokio::test]
+    async fn test_delete_virtual_hosted() {
+        // Virtual-hosted style + bulk delete enabled.
+        // `delete_stream` must issue a single `POST /?delete` (bucket is in
+        // the host, not the path).
+        let mock = MockServer::new().await;
+        let captured: Arc<std::sync::Mutex<Vec<CapturedRequest>>> = 
Default::default();
+        let c = Arc::clone(&captured);
+        mock.push_fn(move |req| {
+            capture(&c, &req);
+            Response::builder()
+                .status(200)
+                
.body("<DeleteResult><Deleted><Key>foo</Key></Deleted></DeleteResult>".to_string())
+                .unwrap()
+        });
+
+        let store = make_store(&mock, true, false);
+        let locations = 
futures_util::stream::iter(vec![Ok(Path::from("foo"))]).boxed();
+        let deleted: Vec<_> = 
store.delete_stream(locations).try_collect().await.unwrap();
+        assert_eq!(deleted.len(), 1);
+
+        let captured = captured.lock().unwrap().clone();
+        assert_eq!(captured.len(), 1, "expected one bulk delete request");
+        assert_eq!(captured[0].0, Method::POST);
+        assert_eq!(captured[0].1, "/");
+        assert_eq!(captured[0].2.as_deref(), Some("delete"));
+
+        mock.shutdown().await;
+    }
+
+    #[tokio::test]
+    async fn test_delete_virtual_hosted_with_disable_bulk() {
+        // Virtual-hosted style + bulk delete disabled.
+        // `delete_stream` must fan out into `DELETE /{key}` (no bucket in
+        // path, no `?delete` query).
+        let mock = MockServer::new().await;
+        let captured: Arc<std::sync::Mutex<Vec<CapturedRequest>>> = 
Default::default();
+        for _ in 0..2 {
+            let c = Arc::clone(&captured);
+            mock.push_fn(move |req| {
+                capture(&c, &req);
+                Response::builder().status(204).body(String::new()).unwrap()
+            });
+        }
+
+        let store = make_store(&mock, true, true);
+        let locations =
+            futures_util::stream::iter(vec![Ok(Path::from("foo")), 
Ok(Path::from("bar"))]).boxed();
+        let deleted: Vec<_> = 
store.delete_stream(locations).try_collect().await.unwrap();
+        assert_eq!(deleted.len(), 2);
+
+        let mut captured = captured.lock().unwrap().clone();
+        captured.sort_by(|a, b| a.1.cmp(&b.1));
+        assert_eq!(captured.len(), 2, "expected one DELETE per object");
+        assert_eq!(captured[0], (Method::DELETE, "/bar".to_string(), None));
+        assert_eq!(captured[1], (Method::DELETE, "/foo".to_string(), None));
+
+        mock.shutdown().await;
+    }
+
     #[tokio::test]
     async fn test_default_headers_signed_get_request() {
         let mock = MockServer::new().await;
diff --git a/src/aws/mod.rs b/src/aws/mod.rs
index 935c653..e1cdb06 100644
--- a/src/aws/mod.rs
+++ b/src/aws/mod.rs
@@ -264,6 +264,25 @@ impl ObjectStore for AmazonS3 {
         locations: BoxStream<'static, Result<Path>>,
     ) -> BoxStream<'static, Result<Path>> {
         let client = Arc::clone(&self.client);
+
+        // Some S3-compatible providers do not implement
+        // the bulk `DeleteObjects` API (`POST /?delete`). When bulk delete is
+        // disabled, fall back to parallel single-object `DELETE /key` 
requests,
+        // which are part of the core S3 API supported by every provider.
+        if client.config.disable_bulk_delete {
+            return locations
+                .map(move |location| {
+                    let client = Arc::clone(&client);
+                    async move {
+                        let location = location?;
+                        client.delete_request(&location).await?;
+                        Ok(location)
+                    }
+                })
+                .buffered(20)
+                .boxed();
+        }
+
         locations
             .try_chunks(1_000)
             .map(move |locations| {

Reply via email to