This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git
The following commit(s) were added to refs/heads/main by this push:
new 1a61282 Support PutMultipartOptions for
MultipartStore::create_multipart for AWS/GCP. (#754)
1a61282 is described below
commit 1a612825dc6f8ccfe1d8bd871df6acae2c68527e
Author: Kevin Zhou <[email protected]>
AuthorDate: Wed Jun 17 12:54:52 2026 -0700
Support PutMultipartOptions for MultipartStore::create_multipart for
AWS/GCP. (#754)
* Add create_multipart_opts
* Add is_empty for TagSet
* fix test
---------
Co-authored-by: Kevin Zhou <[email protected]>
---
src/aws/mod.rs | 9 ++++++
src/azure/mod.rs | 43 ++++++++++++++++++++++++++++-
src/gcp/mod.rs | 9 ++++++
src/integration.rs | 38 ++++++++++++++++++++++++-
src/memory.rs | 21 ++++++++++++--
src/multipart.rs | 81 +++++++++++++++++++++++++++++++++++++++++++++++++++++-
src/prefix.rs | 10 +++++++
src/tags.rs | 5 ++++
src/throttle.rs | 9 ++++++
9 files changed, 220 insertions(+), 5 deletions(-)
diff --git a/src/aws/mod.rs b/src/aws/mod.rs
index 149d9e2..b02bd0e 100644
--- a/src/aws/mod.rs
+++ b/src/aws/mod.rs
@@ -490,6 +490,14 @@ impl MultipartStore for AmazonS3 {
.await
}
+ async fn create_multipart_opts(
+ &self,
+ path: &Path,
+ opts: PutMultipartOptions,
+ ) -> Result<MultipartId> {
+ self.client.create_multipart(path, opts).await
+ }
+
async fn put_part(
&self,
path: &Path,
@@ -703,6 +711,7 @@ mod tests {
rename_and_copy(&integration).await;
stream_get(&integration).await;
multipart(&integration, &integration).await;
+ multipart_with_opts(&integration, &integration).await;
multipart_put_part_out_of_order(&integration, &integration).await;
multipart_race_condition(&integration, true).await;
multipart_out_of_order(&integration).await;
diff --git a/src/azure/mod.rs b/src/azure/mod.rs
index b75798f..b53e031 100644
--- a/src/azure/mod.rs
+++ b/src/azure/mod.rs
@@ -27,7 +27,7 @@
//! Unused blocks will automatically be dropped after 7 days.
//!
use crate::{
- CopyMode, CopyOptions, GetOptions, GetResult, ListResult, MultipartId,
MultipartUpload,
+ CopyMode, CopyOptions, Error, GetOptions, GetResult, ListResult,
MultipartId, MultipartUpload,
ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload,
PutResult, Result,
UploadPart,
multipart::{MultipartStore, PartId},
@@ -308,6 +308,26 @@ impl MultipartStore for MicrosoftAzure {
Ok(String::new())
}
+ async fn create_multipart_opts(
+ &self,
+ path: &Path,
+ opts: PutMultipartOptions,
+ ) -> Result<MultipartId> {
+ let PutMultipartOptions {
+ tags,
+ attributes,
+ extensions: _,
+ } = opts;
+
+ if !tags.is_empty() || !attributes.is_empty() {
+ return Err(Error::NotSupported {
+ source: "`create_multipart_opts` with non-default options is
not supported by MicrosoftAzure".into(),
+ });
+ }
+
+ self.create_multipart(path).await
+ }
+
async fn put_part(
&self,
path: &Path,
@@ -353,12 +373,33 @@ mod tests {
use crate::ObjectStoreExt;
use crate::integration::*;
use crate::tests::*;
+ use crate::{Attribute, Attributes};
use base64::Engine;
use base64::prelude::BASE64_STANDARD;
use bytes::Bytes;
use std::time::{SystemTime, UNIX_EPOCH};
#[cfg(feature = "reqwest")]
+ #[tokio::test]
+ async fn azure_create_multipart_opts_rejects_attributes() {
+ let integration = MicrosoftAzureBuilder::new()
+ .with_container_name("test")
+ .with_use_emulator(true)
+ .build()
+ .unwrap();
+ let opts = PutMultipartOptions {
+ attributes:
Attributes::from_iter([(Attribute::Metadata("key".into()), "value")]),
+ ..Default::default()
+ };
+
+ let err = integration
+ .create_multipart_opts(&Path::from("test"), opts)
+ .await
+ .unwrap_err();
+
+ assert!(matches!(err, Error::NotSupported { .. }));
+ }
+
#[tokio::test]
async fn azure_blob_test() {
maybe_skip_integration!();
diff --git a/src/gcp/mod.rs b/src/gcp/mod.rs
index fe9b290..38a92c4 100644
--- a/src/gcp/mod.rs
+++ b/src/gcp/mod.rs
@@ -236,6 +236,14 @@ impl MultipartStore for GoogleCloudStorage {
.await
}
+ async fn create_multipart_opts(
+ &self,
+ path: &Path,
+ opts: PutMultipartOptions,
+ ) -> Result<MultipartId> {
+ self.client.multipart_initiate(path, opts).await
+ }
+
async fn put_part(
&self,
path: &Path,
@@ -335,6 +343,7 @@ mod test {
// https://github.com/fsouza/fake-gcs-server/issues/852
stream_get(&integration).await;
multipart(&integration, &integration).await;
+ multipart_with_opts(&integration, &integration).await;
multipart_put_part_out_of_order(&integration, &integration).await;
multipart_race_condition(&integration, true).await;
multipart_out_of_order(&integration).await;
diff --git a/src/integration.rs b/src/integration.rs
index bcd4dca..b101fc6 100644
--- a/src/integration.rs
+++ b/src/integration.rs
@@ -29,7 +29,8 @@ use crate::multipart::MultipartStore;
use crate::path::Path;
use crate::{
Attribute, Attributes, DynObjectStore, Error, GetOptions, GetRange,
MultipartUpload,
- ObjectStore, ObjectStoreExt, PutMode, PutPayload, UpdateVersion,
WriteMultipart,
+ ObjectStore, ObjectStoreExt, PutMode, PutMultipartOptions, PutPayload,
UpdateVersion,
+ WriteMultipart,
};
use bytes::Bytes;
use futures_util::stream::FuturesUnordered;
@@ -1053,6 +1054,41 @@ pub async fn multipart(storage: &dyn ObjectStore,
multipart: &dyn MultipartStore
assert_eq!(meta.size, 0);
}
+/// Tests [`MultipartStore::create_multipart_opts`]
+pub async fn multipart_with_opts(storage: &dyn ObjectStore, multipart: &dyn
MultipartStore) {
+ let path = Path::from("test_multipart_with_opts");
+ let chunk_size = 5 * 1024 * 1024;
+ let chunks = get_chunks(chunk_size, 2);
+ let attributes =
+ Attributes::from_iter([(Attribute::Metadata("test_key".into()),
"test_value")]);
+ let opts = PutMultipartOptions {
+ attributes: attributes.clone(),
+ ..Default::default()
+ };
+
+ let id = multipart.create_multipart_opts(&path, opts).await.unwrap();
+
+ let parts: Vec<_> = futures_util::stream::iter(chunks)
+ .enumerate()
+ .map(|(idx, b)| multipart.put_part(&path, &id, idx, b.into()))
+ .buffered(2)
+ .try_collect()
+ .await
+ .unwrap();
+
+ multipart
+ .complete_multipart(&path, &id, parts)
+ .await
+ .unwrap();
+
+ let result = storage.get(&path).await.unwrap();
+ assert_eq!(result.meta.size, chunk_size as u64 * 2);
+ // Some providers add default attributes, e.g. S3 may report a default
Content-Type.
+ for (attribute, value) in &attributes {
+ assert_eq!(result.attributes.get(attribute), Some(value));
+ }
+}
+
/// Tests that [`MultipartStore::put_part`] may be invoked with non-sequential
part indices.
pub async fn multipart_put_part_out_of_order(
storage: &dyn ObjectStore,
diff --git a/src/memory.rs b/src/memory.rs
index abc08d5..cbdcbca 100644
--- a/src/memory.rs
+++ b/src/memory.rs
@@ -116,6 +116,7 @@ struct Storage {
#[derive(Debug, Default, Clone)]
struct PartStorage {
+ attributes: Attributes,
parts: Vec<Option<Bytes>>,
}
@@ -420,10 +421,25 @@ impl ObjectStore for InMemory {
#[async_trait]
impl MultipartStore for InMemory {
async fn create_multipart(&self, _path: &Path) -> Result<MultipartId> {
+ self.create_multipart_opts(_path, PutMultipartOptions::default())
+ .await
+ }
+
+ async fn create_multipart_opts(
+ &self,
+ _path: &Path,
+ opts: PutMultipartOptions,
+ ) -> Result<MultipartId> {
let mut storage = self.storage.write();
let etag = storage.next_etag;
storage.next_etag += 1;
- storage.uploads.insert(etag, Default::default());
+ storage.uploads.insert(
+ etag,
+ PartStorage {
+ attributes: opts.attributes,
+ parts: Default::default(),
+ },
+ );
Ok(etag.to_string())
}
@@ -462,7 +478,7 @@ impl MultipartStore for InMemory {
for x in &upload.parts {
buf.extend_from_slice(x.as_ref().unwrap())
}
- let etag = storage.insert(path, buf.into(), Default::default());
+ let etag = storage.insert(path, buf.into(), upload.attributes);
Ok(PutResult {
e_tag: Some(etag.to_string()),
version: None,
@@ -562,6 +578,7 @@ mod tests {
stream_get(&integration).await;
put_opts(&integration, true).await;
multipart(&integration, &integration).await;
+ multipart_with_opts(&integration, &integration).await;
put_get_attributes(&integration).await;
multipart_put_part_out_of_order(&integration, &integration).await;
}
diff --git a/src/multipart.rs b/src/multipart.rs
index c084725..90db576 100644
--- a/src/multipart.rs
+++ b/src/multipart.rs
@@ -24,7 +24,7 @@
use async_trait::async_trait;
use crate::path::Path;
-use crate::{MultipartId, PutPayload, PutResult, Result};
+use crate::{Error, MultipartId, PutMultipartOptions, PutPayload, PutResult,
Result};
/// Represents a part of a file that has been successfully uploaded in a
multipart upload process.
#[derive(Debug, Clone)]
@@ -46,6 +46,30 @@ pub trait MultipartStore: Send + Sync + 'static {
/// Creates a new multipart upload, returning the [`MultipartId`]
async fn create_multipart(&self, path: &Path) -> Result<MultipartId>;
+ /// Creates a new multipart upload with the given options, returning the
[`MultipartId`]
+ ///
+ /// This allows callers using the low-level multipart API to provide
object attributes,
+ /// tags, or implementation-specific extensions when initiating the upload.
+ async fn create_multipart_opts(
+ &self,
+ path: &Path,
+ opts: PutMultipartOptions,
+ ) -> Result<MultipartId> {
+ let PutMultipartOptions {
+ tags,
+ attributes,
+ extensions: _,
+ } = opts;
+
+ if !tags.is_empty() || !attributes.is_empty() {
+ return Err(Error::NotSupported {
+ source: "create_multipart_opts with non-default
options".into(),
+ });
+ }
+
+ self.create_multipart(path).await
+ }
+
/// Uploads a new part with index `part_idx`
///
/// `part_idx` should be an integer in the range `0..N` where `N` is the
number of
@@ -82,3 +106,58 @@ pub trait MultipartStore: Send + Sync + 'static {
/// Aborts a multipart upload
async fn abort_multipart(&self, path: &Path, id: &MultipartId) ->
Result<()>;
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::Extensions;
+
+ struct TestMultipartStore;
+
+ #[async_trait]
+ impl MultipartStore for TestMultipartStore {
+ async fn create_multipart(&self, _path: &Path) -> Result<MultipartId> {
+ Ok("test".into())
+ }
+
+ async fn put_part(
+ &self,
+ _path: &Path,
+ _id: &MultipartId,
+ _part_idx: usize,
+ _data: PutPayload,
+ ) -> Result<PartId> {
+ unreachable!()
+ }
+
+ async fn complete_multipart(
+ &self,
+ _path: &Path,
+ _id: &MultipartId,
+ _parts: Vec<PartId>,
+ ) -> Result<PutResult> {
+ unreachable!()
+ }
+
+ async fn abort_multipart(&self, _path: &Path, _id: &MultipartId) ->
Result<()> {
+ unreachable!()
+ }
+ }
+
+ #[tokio::test]
+ async fn default_create_multipart_opts_ignores_extensions() {
+ let mut extensions = Extensions::new();
+ extensions.insert("extension");
+ let opts = PutMultipartOptions {
+ extensions,
+ ..Default::default()
+ };
+
+ let id = TestMultipartStore
+ .create_multipart_opts(&Path::from("test"), opts)
+ .await
+ .unwrap();
+
+ assert_eq!(id, "test");
+ }
+}
diff --git a/src/prefix.rs b/src/prefix.rs
index 2b94b48..1f53237 100644
--- a/src/prefix.rs
+++ b/src/prefix.rs
@@ -210,6 +210,15 @@ impl<T: MultipartStore> MultipartStore for PrefixStore<T> {
self.inner.create_multipart(&full_path).await
}
+ async fn create_multipart_opts(
+ &self,
+ path: &Path,
+ opts: PutMultipartOptions,
+ ) -> Result<MultipartId> {
+ let full_path = self.full_path(path);
+ self.inner.create_multipart_opts(&full_path, opts).await
+ }
+
async fn put_part(
&self,
path: &Path,
@@ -392,6 +401,7 @@ mod tests {
let store = PrefixStore::new(InMemory::new(), "prefix");
multipart(&store, &store).await;
+ multipart_with_opts(&store, &store).await;
multipart_put_part_out_of_order(&store, &store).await;
multipart_out_of_order(&store).await;
multipart_race_condition(&store, true).await;
diff --git a/src/tags.rs b/src/tags.rs
index fa6e591..4c7f184 100644
--- a/src/tags.rs
+++ b/src/tags.rs
@@ -41,6 +41,11 @@ impl TagSet {
pub fn encoded(&self) -> &str {
&self.0
}
+
+ /// Return whether this [`TagSet`] contains any tags
+ pub fn is_empty(&self) -> bool {
+ self.0.is_empty()
+ }
}
#[cfg(test)]
diff --git a/src/throttle.rs b/src/throttle.rs
index 695afe4..fd8bedb 100644
--- a/src/throttle.rs
+++ b/src/throttle.rs
@@ -310,6 +310,14 @@ impl<T: MultipartStore> MultipartStore for
ThrottledStore<T> {
self.inner.create_multipart(path).await
}
+ async fn create_multipart_opts(
+ &self,
+ path: &Path,
+ opts: PutMultipartOptions,
+ ) -> Result<MultipartId> {
+ self.inner.create_multipart_opts(path, opts).await
+ }
+
async fn put_part(
&self,
path: &Path,
@@ -400,6 +408,7 @@ mod tests {
copy_if_not_exists(&store).await;
stream_get(&store).await;
multipart(&store, &store).await;
+ multipart_with_opts(&store, &store).await;
multipart_put_part_out_of_order(&store, &store).await;
}