This is an automated email from the ASF dual-hosted git repository.

mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git


The following commit(s) were added to refs/heads/main by this push:
     new 0646c6d  fix(memory): fix out-of-order multipart upload for in-memory 
store (#683)
0646c6d is described below

commit 0646c6d9eb8e302caafbb564cd3e1fe1cca886ad
Author: dentiny <[email protected]>
AuthorDate: Tue Apr 7 04:18:25 2026 -0700

    fix(memory): fix out-of-order multipart upload for in-memory store (#683)
---
 src/aws/mod.rs     |  1 +
 src/azure/mod.rs   |  1 +
 src/gcp/mod.rs     |  1 +
 src/integration.rs | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++
 src/memory.rs      |  3 ++-
 src/prefix.rs      |  1 +
 src/throttle.rs    |  1 +
 7 files changed, 57 insertions(+), 1 deletion(-)

diff --git a/src/aws/mod.rs b/src/aws/mod.rs
index 0a1c233..b3804e2 100644
--- a/src/aws/mod.rs
+++ b/src/aws/mod.rs
@@ -632,6 +632,7 @@ mod tests {
         rename_and_copy(&integration).await;
         stream_get(&integration).await;
         multipart(&integration, &integration).await;
+        multipart_put_part_out_of_order(&integration, &integration).await;
         multipart_race_condition(&integration, true).await;
         multipart_out_of_order(&integration).await;
         signing(&integration).await;
diff --git a/src/azure/mod.rs b/src/azure/mod.rs
index 4460040..e6b9b9c 100644
--- a/src/azure/mod.rs
+++ b/src/azure/mod.rs
@@ -353,6 +353,7 @@ mod tests {
         stream_get(&integration).await;
         put_opts(&integration, true).await;
         multipart(&integration, &integration).await;
+        multipart_put_part_out_of_order(&integration, &integration).await;
         multipart_race_condition(&integration, false).await;
         multipart_out_of_order(&integration).await;
         signing(&integration).await;
diff --git a/src/gcp/mod.rs b/src/gcp/mod.rs
index e1157a8..8d7b26c 100644
--- a/src/gcp/mod.rs
+++ b/src/gcp/mod.rs
@@ -329,6 +329,7 @@ mod test {
             // https://github.com/fsouza/fake-gcs-server/issues/852
             stream_get(&integration).await;
             multipart(&integration, &integration).await;
+            multipart_put_part_out_of_order(&integration, &integration).await;
             multipart_race_condition(&integration, true).await;
             multipart_out_of_order(&integration).await;
             list_paginated(&integration, &integration).await;
diff --git a/src/integration.rs b/src/integration.rs
index fbbbe74..6b55bb2 100644
--- a/src/integration.rs
+++ b/src/integration.rs
@@ -1052,6 +1052,56 @@ pub async fn multipart(storage: &dyn ObjectStore, 
multipart: &dyn MultipartStore
     assert_eq!(meta.size, 0);
 }
 
+/// Tests that [`MultipartStore::put_part`] may be invoked with non-sequential 
part indices.
+pub async fn multipart_put_part_out_of_order(
+    storage: &dyn ObjectStore,
+    multipart: &dyn MultipartStore,
+) {
+    let path = Path::from("test_multipart_put_part_out_of_order");
+
+    // S3: each part except the last must be ≥ 5 MiB.
+    const MIN_MULTIPART_PART: usize = 5 * 1024 * 1024;
+    let part0_data = Bytes::from(vec![0xAA_u8; MIN_MULTIPART_PART]);
+    let part1_data = Bytes::from(vec![0xBB_u8; MIN_MULTIPART_PART]);
+    let part2_data = Bytes::from_static(b"tail");
+
+    let upload_id = multipart.create_multipart(&path).await.unwrap();
+
+    let part2 = multipart
+        .put_part(&path, &upload_id, 2, PutPayload::from(part2_data.clone()))
+        .await
+        .unwrap();
+
+    let part0 = multipart
+        .put_part(&path, &upload_id, 0, PutPayload::from(part0_data.clone()))
+        .await
+        .unwrap();
+
+    let part1 = multipart
+        .put_part(&path, &upload_id, 1, PutPayload::from(part1_data.clone()))
+        .await
+        .unwrap();
+
+    let result = multipart
+        .complete_multipart(&path, &upload_id, vec![part0, part1, part2])
+        .await
+        .unwrap();
+    assert!(result.e_tag.is_some(), "Expected e_tag in PutResult");
+
+    let data = storage.get(&path).await.unwrap().bytes().await.unwrap();
+    assert_eq!(
+        data.len(),
+        part0_data.len() + part1_data.len() + part2_data.len()
+    );
+    assert!(data[..MIN_MULTIPART_PART].iter().all(|&b| b == 0xAA));
+    assert!(
+        data[MIN_MULTIPART_PART..2 * MIN_MULTIPART_PART]
+            .iter()
+            .all(|&b| b == 0xBB)
+    );
+    assert_eq!(&data[2 * MIN_MULTIPART_PART..], part2_data.as_ref());
+}
+
 async fn delete_fixtures(storage: &DynObjectStore) {
     let paths = storage.list(None).map_ok(|meta| meta.location).boxed();
     storage
diff --git a/src/memory.rs b/src/memory.rs
index 096b1ba..383c553 100644
--- a/src/memory.rs
+++ b/src/memory.rs
@@ -433,7 +433,7 @@ impl MultipartStore for InMemory {
     ) -> Result<PartId> {
         let mut storage = self.storage.write();
         let upload = storage.upload_mut(id)?;
-        if part_idx <= upload.parts.len() {
+        if part_idx >= upload.parts.len() {
             upload.parts.resize(part_idx + 1, None);
         }
         upload.parts[part_idx] = Some(payload.into());
@@ -558,6 +558,7 @@ mod tests {
         put_opts(&integration, true).await;
         multipart(&integration, &integration).await;
         put_get_attributes(&integration).await;
+        multipart_put_part_out_of_order(&integration, &integration).await;
     }
 
     #[tokio::test]
diff --git a/src/prefix.rs b/src/prefix.rs
index 2850c52..0ffb09a 100644
--- a/src/prefix.rs
+++ b/src/prefix.rs
@@ -304,6 +304,7 @@ mod tests {
         let store = PrefixStore::new(InMemory::new(), "prefix");
 
         multipart(&store, &store).await;
+        multipart_put_part_out_of_order(&store, &store).await;
         multipart_out_of_order(&store).await;
         multipart_race_condition(&store, true).await;
     }
diff --git a/src/throttle.rs b/src/throttle.rs
index 19bb991..695afe4 100644
--- a/src/throttle.rs
+++ b/src/throttle.rs
@@ -400,6 +400,7 @@ mod tests {
         copy_if_not_exists(&store).await;
         stream_get(&store).await;
         multipart(&store, &store).await;
+        multipart_put_part_out_of_order(&store, &store).await;
     }
 
     #[tokio::test]

Reply via email to