This is an automated email from the ASF dual-hosted git repository.
mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git
The following commit(s) were added to refs/heads/main by this push:
new 0646c6d fix(memory): fix out-of-order multipart upload for in-memory
store (#683)
0646c6d is described below
commit 0646c6d9eb8e302caafbb564cd3e1fe1cca886ad
Author: dentiny <[email protected]>
AuthorDate: Tue Apr 7 04:18:25 2026 -0700
fix(memory): fix out-of-order multipart upload for in-memory store (#683)
---
src/aws/mod.rs | 1 +
src/azure/mod.rs | 1 +
src/gcp/mod.rs | 1 +
src/integration.rs | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++
src/memory.rs | 3 ++-
src/prefix.rs | 1 +
src/throttle.rs | 1 +
7 files changed, 57 insertions(+), 1 deletion(-)
diff --git a/src/aws/mod.rs b/src/aws/mod.rs
index 0a1c233..b3804e2 100644
--- a/src/aws/mod.rs
+++ b/src/aws/mod.rs
@@ -632,6 +632,7 @@ mod tests {
rename_and_copy(&integration).await;
stream_get(&integration).await;
multipart(&integration, &integration).await;
+ multipart_put_part_out_of_order(&integration, &integration).await;
multipart_race_condition(&integration, true).await;
multipart_out_of_order(&integration).await;
signing(&integration).await;
diff --git a/src/azure/mod.rs b/src/azure/mod.rs
index 4460040..e6b9b9c 100644
--- a/src/azure/mod.rs
+++ b/src/azure/mod.rs
@@ -353,6 +353,7 @@ mod tests {
stream_get(&integration).await;
put_opts(&integration, true).await;
multipart(&integration, &integration).await;
+ multipart_put_part_out_of_order(&integration, &integration).await;
multipart_race_condition(&integration, false).await;
multipart_out_of_order(&integration).await;
signing(&integration).await;
diff --git a/src/gcp/mod.rs b/src/gcp/mod.rs
index e1157a8..8d7b26c 100644
--- a/src/gcp/mod.rs
+++ b/src/gcp/mod.rs
@@ -329,6 +329,7 @@ mod test {
// https://github.com/fsouza/fake-gcs-server/issues/852
stream_get(&integration).await;
multipart(&integration, &integration).await;
+ multipart_put_part_out_of_order(&integration, &integration).await;
multipart_race_condition(&integration, true).await;
multipart_out_of_order(&integration).await;
list_paginated(&integration, &integration).await;
diff --git a/src/integration.rs b/src/integration.rs
index fbbbe74..6b55bb2 100644
--- a/src/integration.rs
+++ b/src/integration.rs
@@ -1052,6 +1052,56 @@ pub async fn multipart(storage: &dyn ObjectStore,
multipart: &dyn MultipartStore
assert_eq!(meta.size, 0);
}
+/// Tests that [`MultipartStore::put_part`] may be invoked with non-sequential
part indices.
+pub async fn multipart_put_part_out_of_order(
+ storage: &dyn ObjectStore,
+ multipart: &dyn MultipartStore,
+) {
+ let path = Path::from("test_multipart_put_part_out_of_order");
+
+ // S3: each part except the last must be ≥ 5 MiB.
+ const MIN_MULTIPART_PART: usize = 5 * 1024 * 1024;
+ let part0_data = Bytes::from(vec![0xAA_u8; MIN_MULTIPART_PART]);
+ let part1_data = Bytes::from(vec![0xBB_u8; MIN_MULTIPART_PART]);
+ let part2_data = Bytes::from_static(b"tail");
+
+ let upload_id = multipart.create_multipart(&path).await.unwrap();
+
+ let part2 = multipart
+ .put_part(&path, &upload_id, 2, PutPayload::from(part2_data.clone()))
+ .await
+ .unwrap();
+
+ let part0 = multipart
+ .put_part(&path, &upload_id, 0, PutPayload::from(part0_data.clone()))
+ .await
+ .unwrap();
+
+ let part1 = multipart
+ .put_part(&path, &upload_id, 1, PutPayload::from(part1_data.clone()))
+ .await
+ .unwrap();
+
+ let result = multipart
+ .complete_multipart(&path, &upload_id, vec![part0, part1, part2])
+ .await
+ .unwrap();
+ assert!(result.e_tag.is_some(), "Expected e_tag in PutResult");
+
+ let data = storage.get(&path).await.unwrap().bytes().await.unwrap();
+ assert_eq!(
+ data.len(),
+ part0_data.len() + part1_data.len() + part2_data.len()
+ );
+ assert!(data[..MIN_MULTIPART_PART].iter().all(|&b| b == 0xAA));
+ assert!(
+ data[MIN_MULTIPART_PART..2 * MIN_MULTIPART_PART]
+ .iter()
+ .all(|&b| b == 0xBB)
+ );
+ assert_eq!(&data[2 * MIN_MULTIPART_PART..], part2_data.as_ref());
+}
+
async fn delete_fixtures(storage: &DynObjectStore) {
let paths = storage.list(None).map_ok(|meta| meta.location).boxed();
storage
diff --git a/src/memory.rs b/src/memory.rs
index 096b1ba..383c553 100644
--- a/src/memory.rs
+++ b/src/memory.rs
@@ -433,7 +433,7 @@ impl MultipartStore for InMemory {
) -> Result<PartId> {
let mut storage = self.storage.write();
let upload = storage.upload_mut(id)?;
- if part_idx <= upload.parts.len() {
+ if part_idx >= upload.parts.len() {
upload.parts.resize(part_idx + 1, None);
}
upload.parts[part_idx] = Some(payload.into());
@@ -558,6 +558,7 @@ mod tests {
put_opts(&integration, true).await;
multipart(&integration, &integration).await;
put_get_attributes(&integration).await;
+ multipart_put_part_out_of_order(&integration, &integration).await;
}
#[tokio::test]
diff --git a/src/prefix.rs b/src/prefix.rs
index 2850c52..0ffb09a 100644
--- a/src/prefix.rs
+++ b/src/prefix.rs
@@ -304,6 +304,7 @@ mod tests {
let store = PrefixStore::new(InMemory::new(), "prefix");
multipart(&store, &store).await;
+ multipart_put_part_out_of_order(&store, &store).await;
multipart_out_of_order(&store).await;
multipart_race_condition(&store, true).await;
}
diff --git a/src/throttle.rs b/src/throttle.rs
index 19bb991..695afe4 100644
--- a/src/throttle.rs
+++ b/src/throttle.rs
@@ -400,6 +400,7 @@ mod tests {
copy_if_not_exists(&store).await;
stream_get(&store).await;
multipart(&store, &store).await;
+ multipart_put_part_out_of_order(&store, &store).await;
}
#[tokio::test]