This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a61282  Support PutMultipartOptions for 
MultipartStore::create_multipart for AWS/GCP. (#754)
1a61282 is described below

commit 1a612825dc6f8ccfe1d8bd871df6acae2c68527e
Author: Kevin Zhou <[email protected]>
AuthorDate: Wed Jun 17 12:54:52 2026 -0700

    Support PutMultipartOptions for MultipartStore::create_multipart for 
AWS/GCP. (#754)
    
    * Add create_multipart_opts
    
    * Add is_empty for TagSet
    
    * fix test
    
    ---------
    
    Co-authored-by: Kevin Zhou <[email protected]>
---
 src/aws/mod.rs     |  9 ++++++
 src/azure/mod.rs   | 43 ++++++++++++++++++++++++++++-
 src/gcp/mod.rs     |  9 ++++++
 src/integration.rs | 38 ++++++++++++++++++++++++-
 src/memory.rs      | 21 ++++++++++++--
 src/multipart.rs   | 81 +++++++++++++++++++++++++++++++++++++++++++++++++++++-
 src/prefix.rs      | 10 +++++++
 src/tags.rs        |  5 ++++
 src/throttle.rs    |  9 ++++++
 9 files changed, 220 insertions(+), 5 deletions(-)

diff --git a/src/aws/mod.rs b/src/aws/mod.rs
index 149d9e2..b02bd0e 100644
--- a/src/aws/mod.rs
+++ b/src/aws/mod.rs
@@ -490,6 +490,14 @@ impl MultipartStore for AmazonS3 {
             .await
     }
 
+    async fn create_multipart_opts(
+        &self,
+        path: &Path,
+        opts: PutMultipartOptions,
+    ) -> Result<MultipartId> {
+        self.client.create_multipart(path, opts).await
+    }
+
     async fn put_part(
         &self,
         path: &Path,
@@ -703,6 +711,7 @@ mod tests {
         rename_and_copy(&integration).await;
         stream_get(&integration).await;
         multipart(&integration, &integration).await;
+        multipart_with_opts(&integration, &integration).await;
         multipart_put_part_out_of_order(&integration, &integration).await;
         multipart_race_condition(&integration, true).await;
         multipart_out_of_order(&integration).await;
diff --git a/src/azure/mod.rs b/src/azure/mod.rs
index b75798f..b53e031 100644
--- a/src/azure/mod.rs
+++ b/src/azure/mod.rs
@@ -27,7 +27,7 @@
 //! Unused blocks will automatically be dropped after 7 days.
 //!
 use crate::{
-    CopyMode, CopyOptions, GetOptions, GetResult, ListResult, MultipartId, 
MultipartUpload,
+    CopyMode, CopyOptions, Error, GetOptions, GetResult, ListResult, 
MultipartId, MultipartUpload,
     ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, 
PutResult, Result,
     UploadPart,
     multipart::{MultipartStore, PartId},
@@ -308,6 +308,26 @@ impl MultipartStore for MicrosoftAzure {
         Ok(String::new())
     }
 
+    async fn create_multipart_opts(
+        &self,
+        path: &Path,
+        opts: PutMultipartOptions,
+    ) -> Result<MultipartId> {
+        let PutMultipartOptions {
+            tags,
+            attributes,
+            extensions: _,
+        } = opts;
+
+        if !tags.is_empty() || !attributes.is_empty() {
+            return Err(Error::NotSupported {
+                source: "`create_multipart_opts` with non-default options is 
not supported by MicrosoftAzure".into(),
+            });
+        }
+
+        self.create_multipart(path).await
+    }
+
     async fn put_part(
         &self,
         path: &Path,
@@ -353,12 +373,33 @@ mod tests {
     use crate::ObjectStoreExt;
     use crate::integration::*;
     use crate::tests::*;
+    use crate::{Attribute, Attributes};
     use base64::Engine;
     use base64::prelude::BASE64_STANDARD;
     use bytes::Bytes;
     use std::time::{SystemTime, UNIX_EPOCH};
 
     #[cfg(feature = "reqwest")]
+    #[tokio::test]
+    async fn azure_create_multipart_opts_rejects_attributes() {
+        let integration = MicrosoftAzureBuilder::new()
+            .with_container_name("test")
+            .with_use_emulator(true)
+            .build()
+            .unwrap();
+        let opts = PutMultipartOptions {
+            attributes: 
Attributes::from_iter([(Attribute::Metadata("key".into()), "value")]),
+            ..Default::default()
+        };
+
+        let err = integration
+            .create_multipart_opts(&Path::from("test"), opts)
+            .await
+            .unwrap_err();
+
+        assert!(matches!(err, Error::NotSupported { .. }));
+    }
+
     #[tokio::test]
     async fn azure_blob_test() {
         maybe_skip_integration!();
diff --git a/src/gcp/mod.rs b/src/gcp/mod.rs
index fe9b290..38a92c4 100644
--- a/src/gcp/mod.rs
+++ b/src/gcp/mod.rs
@@ -236,6 +236,14 @@ impl MultipartStore for GoogleCloudStorage {
             .await
     }
 
+    async fn create_multipart_opts(
+        &self,
+        path: &Path,
+        opts: PutMultipartOptions,
+    ) -> Result<MultipartId> {
+        self.client.multipart_initiate(path, opts).await
+    }
+
     async fn put_part(
         &self,
         path: &Path,
@@ -335,6 +343,7 @@ mod test {
             // https://github.com/fsouza/fake-gcs-server/issues/852
             stream_get(&integration).await;
             multipart(&integration, &integration).await;
+            multipart_with_opts(&integration, &integration).await;
             multipart_put_part_out_of_order(&integration, &integration).await;
             multipart_race_condition(&integration, true).await;
             multipart_out_of_order(&integration).await;
diff --git a/src/integration.rs b/src/integration.rs
index bcd4dca..b101fc6 100644
--- a/src/integration.rs
+++ b/src/integration.rs
@@ -29,7 +29,8 @@ use crate::multipart::MultipartStore;
 use crate::path::Path;
 use crate::{
     Attribute, Attributes, DynObjectStore, Error, GetOptions, GetRange, 
MultipartUpload,
-    ObjectStore, ObjectStoreExt, PutMode, PutPayload, UpdateVersion, 
WriteMultipart,
+    ObjectStore, ObjectStoreExt, PutMode, PutMultipartOptions, PutPayload, 
UpdateVersion,
+    WriteMultipart,
 };
 use bytes::Bytes;
 use futures_util::stream::FuturesUnordered;
@@ -1053,6 +1054,41 @@ pub async fn multipart(storage: &dyn ObjectStore, 
multipart: &dyn MultipartStore
     assert_eq!(meta.size, 0);
 }
 
+/// Tests [`MultipartStore::create_multipart_opts`]
+pub async fn multipart_with_opts(storage: &dyn ObjectStore, multipart: &dyn 
MultipartStore) {
+    let path = Path::from("test_multipart_with_opts");
+    let chunk_size = 5 * 1024 * 1024;
+    let chunks = get_chunks(chunk_size, 2);
+    let attributes =
+        Attributes::from_iter([(Attribute::Metadata("test_key".into()), 
"test_value")]);
+    let opts = PutMultipartOptions {
+        attributes: attributes.clone(),
+        ..Default::default()
+    };
+
+    let id = multipart.create_multipart_opts(&path, opts).await.unwrap();
+
+    let parts: Vec<_> = futures_util::stream::iter(chunks)
+        .enumerate()
+        .map(|(idx, b)| multipart.put_part(&path, &id, idx, b.into()))
+        .buffered(2)
+        .try_collect()
+        .await
+        .unwrap();
+
+    multipart
+        .complete_multipart(&path, &id, parts)
+        .await
+        .unwrap();
+
+    let result = storage.get(&path).await.unwrap();
+    assert_eq!(result.meta.size, chunk_size as u64 * 2);
+    // Some providers add default attributes, e.g. S3 may report a default 
Content-Type.
+    for (attribute, value) in &attributes {
+        assert_eq!(result.attributes.get(attribute), Some(value));
+    }
+}
+
 /// Tests that [`MultipartStore::put_part`] may be invoked with non-sequential 
part indices.
 pub async fn multipart_put_part_out_of_order(
     storage: &dyn ObjectStore,
diff --git a/src/memory.rs b/src/memory.rs
index abc08d5..cbdcbca 100644
--- a/src/memory.rs
+++ b/src/memory.rs
@@ -116,6 +116,7 @@ struct Storage {
 
 #[derive(Debug, Default, Clone)]
 struct PartStorage {
+    attributes: Attributes,
     parts: Vec<Option<Bytes>>,
 }
 
@@ -420,10 +421,25 @@ impl ObjectStore for InMemory {
 #[async_trait]
 impl MultipartStore for InMemory {
     async fn create_multipart(&self, _path: &Path) -> Result<MultipartId> {
+        self.create_multipart_opts(_path, PutMultipartOptions::default())
+            .await
+    }
+
+    async fn create_multipart_opts(
+        &self,
+        _path: &Path,
+        opts: PutMultipartOptions,
+    ) -> Result<MultipartId> {
         let mut storage = self.storage.write();
         let etag = storage.next_etag;
         storage.next_etag += 1;
-        storage.uploads.insert(etag, Default::default());
+        storage.uploads.insert(
+            etag,
+            PartStorage {
+                attributes: opts.attributes,
+                parts: Default::default(),
+            },
+        );
         Ok(etag.to_string())
     }
 
@@ -462,7 +478,7 @@ impl MultipartStore for InMemory {
         for x in &upload.parts {
             buf.extend_from_slice(x.as_ref().unwrap())
         }
-        let etag = storage.insert(path, buf.into(), Default::default());
+        let etag = storage.insert(path, buf.into(), upload.attributes);
         Ok(PutResult {
             e_tag: Some(etag.to_string()),
             version: None,
@@ -562,6 +578,7 @@ mod tests {
         stream_get(&integration).await;
         put_opts(&integration, true).await;
         multipart(&integration, &integration).await;
+        multipart_with_opts(&integration, &integration).await;
         put_get_attributes(&integration).await;
         multipart_put_part_out_of_order(&integration, &integration).await;
     }
diff --git a/src/multipart.rs b/src/multipart.rs
index c084725..90db576 100644
--- a/src/multipart.rs
+++ b/src/multipart.rs
@@ -24,7 +24,7 @@
 use async_trait::async_trait;
 
 use crate::path::Path;
-use crate::{MultipartId, PutPayload, PutResult, Result};
+use crate::{Error, MultipartId, PutMultipartOptions, PutPayload, PutResult, 
Result};
 
 /// Represents a part of a file that has been successfully uploaded in a 
multipart upload process.
 #[derive(Debug, Clone)]
@@ -46,6 +46,30 @@ pub trait MultipartStore: Send + Sync + 'static {
     /// Creates a new multipart upload, returning the [`MultipartId`]
     async fn create_multipart(&self, path: &Path) -> Result<MultipartId>;
 
+    /// Creates a new multipart upload with the given options, returning the 
[`MultipartId`]
+    ///
+    /// This allows callers using the low-level multipart API to provide 
object attributes,
+    /// tags, or implementation-specific extensions when initiating the upload.
+    async fn create_multipart_opts(
+        &self,
+        path: &Path,
+        opts: PutMultipartOptions,
+    ) -> Result<MultipartId> {
+        let PutMultipartOptions {
+            tags,
+            attributes,
+            extensions: _,
+        } = opts;
+
+        if !tags.is_empty() || !attributes.is_empty() {
+            return Err(Error::NotSupported {
+                source: "create_multipart_opts with non-default 
options".into(),
+            });
+        }
+
+        self.create_multipart(path).await
+    }
+
     /// Uploads a new part with index `part_idx`
     ///
     /// `part_idx` should be an integer in the range `0..N` where `N` is the 
number of
@@ -82,3 +106,58 @@ pub trait MultipartStore: Send + Sync + 'static {
     /// Aborts a multipart upload
     async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> 
Result<()>;
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::Extensions;
+
+    struct TestMultipartStore;
+
+    #[async_trait]
+    impl MultipartStore for TestMultipartStore {
+        async fn create_multipart(&self, _path: &Path) -> Result<MultipartId> {
+            Ok("test".into())
+        }
+
+        async fn put_part(
+            &self,
+            _path: &Path,
+            _id: &MultipartId,
+            _part_idx: usize,
+            _data: PutPayload,
+        ) -> Result<PartId> {
+            unreachable!()
+        }
+
+        async fn complete_multipart(
+            &self,
+            _path: &Path,
+            _id: &MultipartId,
+            _parts: Vec<PartId>,
+        ) -> Result<PutResult> {
+            unreachable!()
+        }
+
+        async fn abort_multipart(&self, _path: &Path, _id: &MultipartId) -> 
Result<()> {
+            unreachable!()
+        }
+    }
+
+    #[tokio::test]
+    async fn default_create_multipart_opts_ignores_extensions() {
+        let mut extensions = Extensions::new();
+        extensions.insert("extension");
+        let opts = PutMultipartOptions {
+            extensions,
+            ..Default::default()
+        };
+
+        let id = TestMultipartStore
+            .create_multipart_opts(&Path::from("test"), opts)
+            .await
+            .unwrap();
+
+        assert_eq!(id, "test");
+    }
+}
diff --git a/src/prefix.rs b/src/prefix.rs
index 2b94b48..1f53237 100644
--- a/src/prefix.rs
+++ b/src/prefix.rs
@@ -210,6 +210,15 @@ impl<T: MultipartStore> MultipartStore for PrefixStore<T> {
         self.inner.create_multipart(&full_path).await
     }
 
+    async fn create_multipart_opts(
+        &self,
+        path: &Path,
+        opts: PutMultipartOptions,
+    ) -> Result<MultipartId> {
+        let full_path = self.full_path(path);
+        self.inner.create_multipart_opts(&full_path, opts).await
+    }
+
     async fn put_part(
         &self,
         path: &Path,
@@ -392,6 +401,7 @@ mod tests {
         let store = PrefixStore::new(InMemory::new(), "prefix");
 
         multipart(&store, &store).await;
+        multipart_with_opts(&store, &store).await;
         multipart_put_part_out_of_order(&store, &store).await;
         multipart_out_of_order(&store).await;
         multipart_race_condition(&store, true).await;
diff --git a/src/tags.rs b/src/tags.rs
index fa6e591..4c7f184 100644
--- a/src/tags.rs
+++ b/src/tags.rs
@@ -41,6 +41,11 @@ impl TagSet {
     pub fn encoded(&self) -> &str {
         &self.0
     }
+
+    /// Return whether this [`TagSet`] contains any tags
+    pub fn is_empty(&self) -> bool {
+        self.0.is_empty()
+    }
 }
 
 #[cfg(test)]
diff --git a/src/throttle.rs b/src/throttle.rs
index 695afe4..fd8bedb 100644
--- a/src/throttle.rs
+++ b/src/throttle.rs
@@ -310,6 +310,14 @@ impl<T: MultipartStore> MultipartStore for 
ThrottledStore<T> {
         self.inner.create_multipart(path).await
     }
 
+    async fn create_multipart_opts(
+        &self,
+        path: &Path,
+        opts: PutMultipartOptions,
+    ) -> Result<MultipartId> {
+        self.inner.create_multipart_opts(path, opts).await
+    }
+
     async fn put_part(
         &self,
         path: &Path,
@@ -400,6 +408,7 @@ mod tests {
         copy_if_not_exists(&store).await;
         stream_get(&store).await;
         multipart(&store, &store).await;
+        multipart_with_opts(&store, &store).await;
         multipart_put_part_out_of_order(&store, &store).await;
     }
 

Reply via email to