alamb commented on code in PR #5500:
URL: https://github.com/apache/arrow-rs/pull/5500#discussion_r1530972982


##########
object_store/src/upload.rs:
##########
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{PutResult, Result};
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use tokio::task::JoinSet;
+
+/// An upload part request
+pub type UploadPart = BoxFuture<'static, Result<()>>;
+
+/// A trait allowing writing an object in fixed size chunks
+///
+/// Consecutive chunks of data can be written by calling 
[`MultipartUpload::put_part`] and polling
+/// the returned futures to completion. Multiple futures returned by 
[`MultipartUpload::put_part`]
+/// may be polled in parallel, allowing for concurrent uploads.
+///
+/// Once all part uploads have been polled to completion, the upload can be 
completed by
+/// calling [`MultipartUpload::complete`]. This will make the entire uploaded 
object visible
+/// as an atomic operation.It is implementation behind behaviour if 
[`MultipartUpload::complete`]
+/// is called before all [`UploadPart`] have been polled to completion.
+#[async_trait]
+pub trait MultipartUpload: Send + std::fmt::Debug {
+    /// Upload the next part
+    ///
+    /// Most stores require that all parts excluding the last are at least 5 
MiB, and some
+    /// further require that all parts excluding the last be the same size, 
e.g. [R2].
+    /// Clients wanting to maximise compatibility should therefore perform 
writes in
+    /// fixed size blocks larger than 5 MiB.
+    ///
+    /// Implementations may invoke this method multiple times and then await 
on the
+    /// returned futures in parallel
+    ///
+    /// ```no_run
+    /// # use futures::StreamExt;
+    /// # use object_store::MultipartUpload;
+    /// #
+    /// # async fn test() {
+    /// #
+    /// let mut upload: Box<&dyn MultipartUpload> = todo!();
+    /// let p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into());
+    /// let p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into());
+    /// futures::future::try_join(p1, p2).await.unwrap();
+    /// upload.complete().await.unwrap();
+    /// # }
+    /// ```
+    ///
+    /// [R2]: 
https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations
+    fn put_part(&mut self, data: Bytes) -> UploadPart;
+
+    /// Complete the multipart upload
+    ///
+    /// It is implementation defined behaviour if this method is called before 
polling
+    /// all [`UploadPart`] returned by [`MultipartUpload::put_part`] to 
completion. Additionally,
+    /// it is implementation defined behaviour to call 
[`MultipartUpload::complete`]
+    /// on an already completed or aborted [`MultipartUpload`].
+    async fn complete(&mut self) -> Result<PutResult>;
+
+    /// Abort the multipart upload
+    ///
+    /// If a [`MultipartUpload`] is dropped without calling 
[`MultipartUpload::complete`],
+    /// some implementations will automatically reap any uploaded parts. 
However,
+    /// this is not always possible, e.g. for S3 and GCS. 
[`MultipartUpload::abort`] can
+    /// therefore be invoked to perform this cleanup.

Review Comment:
   I think the term `reap`, while more literary, has a greater potential to be 
misunderstood. I also think it would help to explain why it is not always 
possible. 
   
   Here is a suggestion
   
   ```suggestion
       /// If a [`MultipartUpload`] is dropped without calling 
[`MultipartUpload::complete`],
       /// some object stores will automatically clean up any previously 
uploaded parts. 
       /// However, some stores such as  for S3 and GCS do not perform 
automatic cleanup and 
       /// in such cases, [`MultipartUpload::abort`] manually invokes this 
cleanup.
   ```



##########
object_store/src/upload.rs:
##########
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{PutResult, Result};
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use tokio::task::JoinSet;
+
+/// An upload part request
+pub type UploadPart = BoxFuture<'static, Result<()>>;
+
+/// A trait allowing writing an object in fixed size chunks
+///
+/// Consecutive chunks of data can be written by calling 
[`MultipartUpload::put_part`] and polling
+/// the returned futures to completion. Multiple futures returned by 
[`MultipartUpload::put_part`]
+/// may be polled in parallel, allowing for concurrent uploads.
+///
+/// Once all part uploads have been polled to completion, the upload can be 
completed by
+/// calling [`MultipartUpload::complete`]. This will make the entire uploaded 
object visible
+/// as an atomic operation.It is implementation behind behaviour if 
[`MultipartUpload::complete`]
+/// is called before all [`UploadPart`] have been polled to completion.
+#[async_trait]
+pub trait MultipartUpload: Send + std::fmt::Debug {
+    /// Upload the next part
+    ///
+    /// Most stores require that all parts excluding the last are at least 5 
MiB, and some
+    /// further require that all parts excluding the last be the same size, 
e.g. [R2].
+    /// Clients wanting to maximise compatibility should therefore perform 
writes in
+    /// fixed size blocks larger than 5 MiB.
+    ///
+    /// Implementations may invoke this method multiple times and then await 
on the
+    /// returned futures in parallel
+    ///
+    /// ```no_run
+    /// # use futures::StreamExt;
+    /// # use object_store::MultipartUpload;
+    /// #
+    /// # async fn test() {
+    /// #
+    /// let mut upload: Box<&dyn MultipartUpload> = todo!();
+    /// let p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into());
+    /// let p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into());
+    /// futures::future::try_join(p1, p2).await.unwrap();
+    /// upload.complete().await.unwrap();
+    /// # }
+    /// ```
+    ///
+    /// [R2]: 
https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations
+    fn put_part(&mut self, data: Bytes) -> UploadPart;
+
+    /// Complete the multipart upload
+    ///
+    /// It is implementation defined behaviour if this method is called before 
polling
+    /// all [`UploadPart`] returned by [`MultipartUpload::put_part`] to 
completion. Additionally,
+    /// it is implementation defined behaviour to call 
[`MultipartUpload::complete`]
+    /// on an already completed or aborted [`MultipartUpload`].
+    async fn complete(&mut self) -> Result<PutResult>;
+
+    /// Abort the multipart upload
+    ///
+    /// If a [`MultipartUpload`] is dropped without calling 
[`MultipartUpload::complete`],
+    /// some implementations will automatically reap any uploaded parts. 
However,
+    /// this is not always possible, e.g. for S3 and GCS. 
[`MultipartUpload::abort`] can
+    /// therefore be invoked to perform this cleanup.
+    ///
+    /// It is recommended that where possible users configure lifecycle rules
+    /// to automatically reap unused parts older than some threshold, as this
+    /// will more reliably handle different failure modes. See [crate::aws] and
+    /// [crate::gcp] for more information.
+    ///
+    /// It is implementation defined behaviour to call 
[`MultipartUpload::abort`]
+    /// on an already completed or aborted [`MultipartUpload`]
+    async fn abort(&mut self) -> Result<()>;
+}
+
+/// A synchronous write API for uploading data in parallel in fixed size chunks
+///
+/// Makes use of [`JoinSet`] under the hood to multiplex upload tasks,
+/// avoiding issues caused by sharing a single tokio's cooperative task
+/// budget across multiple IO operations.
+///
+/// The design also takes inspiration from [`Sink`] with 
[`WriteMultipart::wait_for_capacity`]
+/// allowing back pressure on producers, prior to buffering the next part. 
However, unlike
+/// [`Sink`] this back pressure is optional, allowing integration with 
synchronous producers
+///
+/// [`Sink`]: futures::sink::Sink
+#[derive(Debug)]
+pub struct WriteMultipart {
+    upload: Box<dyn MultipartUpload>,
+
+    buffer: Vec<u8>,
+
+    tasks: JoinSet<Result<()>>,
+}
+
+impl WriteMultipart {
+    /// Create a new [`WriteMultipart`]
+    pub fn new(upload: Box<dyn MultipartUpload>) -> Self {
+        Self::new_with_capacity(upload, 5 * 1024 * 1024)

Review Comment:
   nit: It might be nice to make this a named constant (mostly so you can 
attach a docstring to the constant and refer to it)



##########
object_store/src/lib.rs:
##########
@@ -269,12 +269,11 @@
 //!
 //! #  Multipart Upload
 //!
-//! Use the [`ObjectStore::put_multipart`] method to atomically write a large 
amount of data,
-//! with implementations automatically handling parallel, chunked upload where 
appropriate.
+//! Use the [`ObjectStore::put_multipart`] method to atomically write a large 
amount of data

Review Comment:
   Can we also add an example using `BufWriter` here (to drive people to use 
that unless they really need to use the multi part API themselves)? It has the 
nice property that it does put/put-mulitpart dynamically. '
   
   



##########
object_store/src/upload.rs:
##########
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{PutResult, Result};
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use tokio::task::JoinSet;
+
+/// An upload part request
+pub type UploadPart = BoxFuture<'static, Result<()>>;
+
+/// A trait allowing writing an object in fixed size chunks
+///
+/// Consecutive chunks of data can be written by calling 
[`MultipartUpload::put_part`] and polling
+/// the returned futures to completion. Multiple futures returned by 
[`MultipartUpload::put_part`]
+/// may be polled in parallel, allowing for concurrent uploads.
+///
+/// Once all part uploads have been polled to completion, the upload can be 
completed by
+/// calling [`MultipartUpload::complete`]. This will make the entire uploaded 
object visible
+/// as an atomic operation.It is implementation behind behaviour if 
[`MultipartUpload::complete`]
+/// is called before all [`UploadPart`] have been polled to completion.
+#[async_trait]
+pub trait MultipartUpload: Send + std::fmt::Debug {
+    /// Upload the next part
+    ///
+    /// Most stores require that all parts excluding the last are at least 5 
MiB, and some
+    /// further require that all parts excluding the last be the same size, 
e.g. [R2].
+    /// Clients wanting to maximise compatibility should therefore perform 
writes in
+    /// fixed size blocks larger than 5 MiB.
+    ///
+    /// Implementations may invoke this method multiple times and then await 
on the
+    /// returned futures in parallel
+    ///
+    /// ```no_run
+    /// # use futures::StreamExt;
+    /// # use object_store::MultipartUpload;
+    /// #
+    /// # async fn test() {
+    /// #
+    /// let mut upload: Box<&dyn MultipartUpload> = todo!();
+    /// let p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into());
+    /// let p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into());
+    /// futures::future::try_join(p1, p2).await.unwrap();
+    /// upload.complete().await.unwrap();
+    /// # }
+    /// ```
+    ///
+    /// [R2]: 
https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations
+    fn put_part(&mut self, data: Bytes) -> UploadPart;
+
+    /// Complete the multipart upload
+    ///
+    /// It is implementation defined behaviour if this method is called before 
polling
+    /// all [`UploadPart`] returned by [`MultipartUpload::put_part`] to 
completion. Additionally,
+    /// it is implementation defined behaviour to call 
[`MultipartUpload::complete`]
+    /// on an already completed or aborted [`MultipartUpload`].
+    async fn complete(&mut self) -> Result<PutResult>;
+
+    /// Abort the multipart upload
+    ///
+    /// If a [`MultipartUpload`] is dropped without calling 
[`MultipartUpload::complete`],
+    /// some implementations will automatically reap any uploaded parts. 
However,
+    /// this is not always possible, e.g. for S3 and GCS. 
[`MultipartUpload::abort`] can
+    /// therefore be invoked to perform this cleanup.
+    ///
+    /// It is recommended that where possible users configure lifecycle rules
+    /// to automatically reap unused parts older than some threshold, as this
+    /// will more reliably handle different failure modes. See [crate::aws] and
+    /// [crate::gcp] for more information.
+    ///
+    /// It is implementation defined behaviour to call 
[`MultipartUpload::abort`]
+    /// on an already completed or aborted [`MultipartUpload`]
+    async fn abort(&mut self) -> Result<()>;

Review Comment:
   This came up downstream as well in DataFusion 
https://github.com/apache/arrow-datafusion/pull/9648#discussion_r1530529422
   
   My opinion is that we should include it in the API and appropriately caveat 
/ explain why it is not always possible. I offered a suggestion above



##########
object_store/src/upload.rs:
##########
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{PutResult, Result};
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use tokio::task::JoinSet;
+
+/// An upload part request
+pub type UploadPart = BoxFuture<'static, Result<()>>;
+
+/// A trait allowing writing an object in fixed size chunks
+///
+/// Consecutive chunks of data can be written by calling 
[`MultipartUpload::put_part`] and polling
+/// the returned futures to completion. Multiple futures returned by 
[`MultipartUpload::put_part`]
+/// may be polled in parallel, allowing for concurrent uploads.
+///
+/// Once all part uploads have been polled to completion, the upload can be 
completed by
+/// calling [`MultipartUpload::complete`]. This will make the entire uploaded 
object visible
+/// as an atomic operation.It is implementation behind behaviour if 
[`MultipartUpload::complete`]
+/// is called before all [`UploadPart`] have been polled to completion.
+#[async_trait]
+pub trait MultipartUpload: Send + std::fmt::Debug {
+    /// Upload the next part
+    ///
+    /// Most stores require that all parts excluding the last are at least 5 
MiB, and some
+    /// further require that all parts excluding the last be the same size, 
e.g. [R2].
+    /// Clients wanting to maximise compatibility should therefore perform 
writes in
+    /// fixed size blocks larger than 5 MiB.
+    ///
+    /// Implementations may invoke this method multiple times and then await 
on the
+    /// returned futures in parallel
+    ///
+    /// ```no_run
+    /// # use futures::StreamExt;
+    /// # use object_store::MultipartUpload;
+    /// #
+    /// # async fn test() {
+    /// #
+    /// let mut upload: Box<&dyn MultipartUpload> = todo!();
+    /// let p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into());
+    /// let p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into());
+    /// futures::future::try_join(p1, p2).await.unwrap();
+    /// upload.complete().await.unwrap();
+    /// # }
+    /// ```
+    ///
+    /// [R2]: 
https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations
+    fn put_part(&mut self, data: Bytes) -> UploadPart;
+
+    /// Complete the multipart upload
+    ///
+    /// It is implementation defined behaviour if this method is called before 
polling
+    /// all [`UploadPart`] returned by [`MultipartUpload::put_part`] to 
completion. Additionally,
+    /// it is implementation defined behaviour to call 
[`MultipartUpload::complete`]
+    /// on an already completed or aborted [`MultipartUpload`].
+    async fn complete(&mut self) -> Result<PutResult>;
+
+    /// Abort the multipart upload
+    ///
+    /// If a [`MultipartUpload`] is dropped without calling 
[`MultipartUpload::complete`],
+    /// some implementations will automatically reap any uploaded parts. 
However,
+    /// this is not always possible, e.g. for S3 and GCS. 
[`MultipartUpload::abort`] can
+    /// therefore be invoked to perform this cleanup.
+    ///
+    /// It is recommended that where possible users configure lifecycle rules
+    /// to automatically reap unused parts older than some threshold, as this
+    /// will more reliably handle different failure modes. See [crate::aws] and
+    /// [crate::gcp] for more information.
+    ///
+    /// It is implementation defined behaviour to call 
[`MultipartUpload::abort`]
+    /// on an already completed or aborted [`MultipartUpload`]
+    async fn abort(&mut self) -> Result<()>;
+}
+
+/// A synchronous write API for uploading data in parallel in fixed size chunks
+///
+/// Makes use of [`JoinSet`] under the hood to multiplex upload tasks,
+/// avoiding issues caused by sharing a single tokio's cooperative task
+/// budget across multiple IO operations.

Review Comment:
   I did not understand the "cooperative task budget across multiple IO 
operations" comment.  I think this could be simplified to something like: 
   
   ```suggestion
   /// Uses multiple tokio tasks in a [`JoinSet`] to multiplex upload tasks in 
parallel
   ```



##########
object_store/src/upload.rs:
##########
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{PutResult, Result};
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use tokio::task::JoinSet;
+
+/// An upload part request
+pub type UploadPart = BoxFuture<'static, Result<()>>;
+
+/// A trait allowing writing an object in fixed size chunks
+///
+/// Consecutive chunks of data can be written by calling 
[`MultipartUpload::put_part`] and polling
+/// the returned futures to completion. Multiple futures returned by 
[`MultipartUpload::put_part`]
+/// may be polled in parallel, allowing for concurrent uploads.
+///
+/// Once all part uploads have been polled to completion, the upload can be 
completed by
+/// calling [`MultipartUpload::complete`]. This will make the entire uploaded 
object visible
+/// as an atomic operation.It is implementation behind behaviour if 
[`MultipartUpload::complete`]
+/// is called before all [`UploadPart`] have been polled to completion.
+#[async_trait]
+pub trait MultipartUpload: Send + std::fmt::Debug {
+    /// Upload the next part
+    ///
+    /// Most stores require that all parts excluding the last are at least 5 
MiB, and some
+    /// further require that all parts excluding the last be the same size, 
e.g. [R2].
+    /// Clients wanting to maximise compatibility should therefore perform 
writes in
+    /// fixed size blocks larger than 5 MiB.
+    ///
+    /// Implementations may invoke this method multiple times and then await 
on the
+    /// returned futures in parallel
+    ///
+    /// ```no_run
+    /// # use futures::StreamExt;
+    /// # use object_store::MultipartUpload;
+    /// #
+    /// # async fn test() {
+    /// #
+    /// let mut upload: Box<&dyn MultipartUpload> = todo!();
+    /// let p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into());
+    /// let p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into());
+    /// futures::future::try_join(p1, p2).await.unwrap();
+    /// upload.complete().await.unwrap();
+    /// # }
+    /// ```
+    ///
+    /// [R2]: 
https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations
+    fn put_part(&mut self, data: Bytes) -> UploadPart;
+
+    /// Complete the multipart upload
+    ///
+    /// It is implementation defined behaviour if this method is called before 
polling
+    /// all [`UploadPart`] returned by [`MultipartUpload::put_part`] to 
completion. Additionally,
+    /// it is implementation defined behaviour to call 
[`MultipartUpload::complete`]
+    /// on an already completed or aborted [`MultipartUpload`].
+    async fn complete(&mut self) -> Result<PutResult>;
+
+    /// Abort the multipart upload
+    ///
+    /// If a [`MultipartUpload`] is dropped without calling 
[`MultipartUpload::complete`],
+    /// some implementations will automatically reap any uploaded parts. 
However,
+    /// this is not always possible, e.g. for S3 and GCS. 
[`MultipartUpload::abort`] can
+    /// therefore be invoked to perform this cleanup.
+    ///
+    /// It is recommended that where possible users configure lifecycle rules
+    /// to automatically reap unused parts older than some threshold, as this
+    /// will more reliably handle different failure modes. See [crate::aws] and
+    /// [crate::gcp] for more information.

Review Comment:
   I think here it helps to be explicit about "failure modes":
   
   ```suggestion
       /// Given it is not possible call `abort` in all failure scenarios (e.g. 
if your program is `SIGKILL`ed due to
       /// OOM), it is recommended to configure your object store with 
lifecycle rules
       /// to automatically cleanup unused parts older than some threshold.
       /// See [crate::aws] and [crate::gcp] for more information.
   ```



##########
object_store/src/upload.rs:
##########
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{PutResult, Result};
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use tokio::task::JoinSet;
+
+/// An upload part request
+pub type UploadPart = BoxFuture<'static, Result<()>>;
+
+/// A trait allowing writing an object in fixed size chunks
+///
+/// Consecutive chunks of data can be written by calling 
[`MultipartUpload::put_part`] and polling
+/// the returned futures to completion. Multiple futures returned by 
[`MultipartUpload::put_part`]
+/// may be polled in parallel, allowing for concurrent uploads.
+///
+/// Once all part uploads have been polled to completion, the upload can be 
completed by
+/// calling [`MultipartUpload::complete`]. This will make the entire uploaded 
object visible
+/// as an atomic operation.It is implementation behind behaviour if 
[`MultipartUpload::complete`]
+/// is called before all [`UploadPart`] have been polled to completion.
+#[async_trait]
+pub trait MultipartUpload: Send + std::fmt::Debug {
+    /// Upload the next part
+    ///
+    /// Most stores require that all parts excluding the last are at least 5 
MiB, and some
+    /// further require that all parts excluding the last be the same size, 
e.g. [R2].
+    /// Clients wanting to maximise compatibility should therefore perform 
writes in
+    /// fixed size blocks larger than 5 MiB.
+    ///
+    /// Implementations may invoke this method multiple times and then await 
on the
+    /// returned futures in parallel
+    ///
+    /// ```no_run
+    /// # use futures::StreamExt;
+    /// # use object_store::MultipartUpload;
+    /// #
+    /// # async fn test() {
+    /// #
+    /// let mut upload: Box<&dyn MultipartUpload> = todo!();
+    /// let p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into());
+    /// let p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into());
+    /// futures::future::try_join(p1, p2).await.unwrap();
+    /// upload.complete().await.unwrap();
+    /// # }
+    /// ```
+    ///
+    /// [R2]: 
https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations
+    fn put_part(&mut self, data: Bytes) -> UploadPart;
+
+    /// Complete the multipart upload
+    ///
+    /// It is implementation defined behaviour if this method is called before 
polling
+    /// all [`UploadPart`] returned by [`MultipartUpload::put_part`] to 
completion. Additionally,
+    /// it is implementation defined behaviour to call 
[`MultipartUpload::complete`]
+    /// on an already completed or aborted [`MultipartUpload`].
+    async fn complete(&mut self) -> Result<PutResult>;
+
+    /// Abort the multipart upload
+    ///
+    /// If a [`MultipartUpload`] is dropped without calling 
[`MultipartUpload::complete`],
+    /// some implementations will automatically reap any uploaded parts. 
However,
+    /// this is not always possible, e.g. for S3 and GCS. 
[`MultipartUpload::abort`] can
+    /// therefore be invoked to perform this cleanup.
+    ///
+    /// It is recommended that where possible users configure lifecycle rules
+    /// to automatically reap unused parts older than some threshold, as this
+    /// will more reliably handle different failure modes. See [crate::aws] and
+    /// [crate::gcp] for more information.
+    ///
+    /// It is implementation defined behaviour to call 
[`MultipartUpload::abort`]
+    /// on an already completed or aborted [`MultipartUpload`]
+    async fn abort(&mut self) -> Result<()>;
+}
+
+/// A synchronous write API for uploading data in parallel in fixed size chunks
+///
+/// Makes use of [`JoinSet`] under the hood to multiplex upload tasks,
+/// avoiding issues caused by sharing a single tokio's cooperative task
+/// budget across multiple IO operations.
+///
+/// The design also takes inspiration from [`Sink`] with 
[`WriteMultipart::wait_for_capacity`]
+/// allowing back pressure on producers, prior to buffering the next part. 
However, unlike
+/// [`Sink`] this back pressure is optional, allowing integration with 
synchronous producers
+///
+/// [`Sink`]: futures::sink::Sink
+#[derive(Debug)]
+pub struct WriteMultipart {
+    upload: Box<dyn MultipartUpload>,
+
+    buffer: Vec<u8>,
+
+    tasks: JoinSet<Result<()>>,
+}
+
+impl WriteMultipart {
+    /// Create a new [`WriteMultipart`]
+    pub fn new(upload: Box<dyn MultipartUpload>) -> Self {
+        Self::new_with_capacity(upload, 5 * 1024 * 1024)
+    }
+
+    /// Create a new [`WriteMultipart`] that will upload in fixed `capacity` 
sized chunks
+    pub fn new_with_capacity(upload: Box<dyn MultipartUpload>, capacity: 
usize) -> Self {
+        Self {
+            upload,
+            buffer: Vec::with_capacity(capacity),
+            tasks: Default::default(),
+        }
+    }
+
+    /// Wait until there are `max_concurrency` or fewer requests in-flight
+    pub async fn wait_for_capacity(&mut self, max_concurrency: usize) -> 
Result<()> {
+        while self.tasks.len() > max_concurrency {
+            self.tasks.join_next().await.unwrap()??;
+        }
+        Ok(())
+    }
+
+    /// Write data to this [`WriteMultipart`]
+    ///
+    /// Back pressure can optionally be applied to producers by calling
+    /// [`Self::wait_for_capacity`] prior to calling this method
+    pub fn write(&mut self, mut buf: &[u8]) {
+        while !buf.is_empty() {
+            let capacity = self.buffer.capacity();
+            let remaining = capacity - self.buffer.len();
+            let to_read = buf.len().min(remaining);
+            self.buffer.extend_from_slice(&buf[..to_read]);
+            if to_read == remaining {
+                let part = std::mem::replace(&mut self.buffer, 
Vec::with_capacity(capacity));
+                self.put_part(part.into())
+            }
+            buf = &buf[to_read..]
+        }
+    }
+
+    fn put_part(&mut self, part: Bytes) {
+        self.tasks.spawn(self.upload.put_part(part));
+    }
+
+    /// Abort this upload

Review Comment:
   ```suggestion
       /// Abort this upload, attempting to clean up any successfully uploaded 
parts
   ```



##########
object_store/src/buffered.rs:
##########
@@ -289,9 +294,10 @@ impl AsyncWrite for BufWriter {
                         let path = std::mem::take(path);
                         let store = Arc::clone(&self.store);
                         self.state = BufWriterState::Prepare(Box::pin(async 
move {
-                            let (id, mut writer) = 
store.put_multipart(&path).await?;
-                            writer.write_all(&buffer).await?;
-                            Ok((id, writer))
+                            let upload = store.put_multipart(&path).await?;
+                            let mut chunked = WriteMultipart::new(upload);
+                            chunked.write(&buffer);

Review Comment:
   this write results in a second buffer copy, right? That double buffering is 
unfortunate



##########
object_store/src/upload.rs:
##########
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{PutResult, Result};
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use tokio::task::JoinSet;
+
+/// An upload part request
+pub type UploadPart = BoxFuture<'static, Result<()>>;
+
+/// A trait allowing writing an object in fixed size chunks
+///
+/// Consecutive chunks of data can be written by calling 
[`MultipartUpload::put_part`] and polling
+/// the returned futures to completion. Multiple futures returned by 
[`MultipartUpload::put_part`]
+/// may be polled in parallel, allowing for concurrent uploads.
+///
+/// Once all part uploads have been polled to completion, the upload can be 
completed by
+/// calling [`MultipartUpload::complete`]. This will make the entire uploaded 
object visible
+/// as an atomic operation.It is implementation behind behaviour if 
[`MultipartUpload::complete`]
+/// is called before all [`UploadPart`] have been polled to completion.
+#[async_trait]
+pub trait MultipartUpload: Send + std::fmt::Debug {
+    /// Upload the next part
+    ///
+    /// Most stores require that all parts excluding the last are at least 5 
MiB, and some
+    /// further require that all parts excluding the last be the same size, 
e.g. [R2].
+    /// Clients wanting to maximise compatibility should therefore perform 
writes in
+    /// fixed size blocks larger than 5 MiB.
+    ///
+    /// Implementations may invoke this method multiple times and then await 
on the
+    /// returned futures in parallel
+    ///
+    /// ```no_run
+    /// # use futures::StreamExt;
+    /// # use object_store::MultipartUpload;
+    /// #
+    /// # async fn test() {
+    /// #
+    /// let mut upload: Box<&dyn MultipartUpload> = todo!();
+    /// let p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into());
+    /// let p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into());
+    /// futures::future::try_join(p1, p2).await.unwrap();
+    /// upload.complete().await.unwrap();
+    /// # }
+    /// ```
+    ///
+    /// [R2]: 
https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations
+    fn put_part(&mut self, data: Bytes) -> UploadPart;
+
+    /// Complete the multipart upload
+    ///
+    /// It is implementation defined behaviour if this method is called before 
polling
+    /// all [`UploadPart`] returned by [`MultipartUpload::put_part`] to 
completion. Additionally,
+    /// it is implementation defined behaviour to call 
[`MultipartUpload::complete`]
+    /// on an already completed or aborted [`MultipartUpload`].
+    async fn complete(&mut self) -> Result<PutResult>;
+
+    /// Abort the multipart upload
+    ///
+    /// If a [`MultipartUpload`] is dropped without calling 
[`MultipartUpload::complete`],
+    /// some implementations will automatically reap any uploaded parts. 
However,
+    /// this is not always possible, e.g. for S3 and GCS. 
[`MultipartUpload::abort`] can
+    /// therefore be invoked to perform this cleanup.
+    ///
+    /// It is recommended that where possible users configure lifecycle rules
+    /// to automatically reap unused parts older than some threshold, as this
+    /// will more reliably handle different failure modes. See [crate::aws] and
+    /// [crate::gcp] for more information.
+    ///
+    /// It is implementation defined behaviour to call 
[`MultipartUpload::abort`]
+    /// on an already completed or aborted [`MultipartUpload`]
+    async fn abort(&mut self) -> Result<()>;
+}
+
+/// A synchronous write API for uploading data in parallel in fixed size chunks
+///
+/// Makes use of [`JoinSet`] under the hood to multiplex upload tasks,
+/// avoiding issues caused by sharing a single tokio's cooperative task
+/// budget across multiple IO operations.
+///
+/// The design also takes inspiration from [`Sink`] with 
[`WriteMultipart::wait_for_capacity`]
+/// allowing back pressure on producers, prior to buffering the next part. 
However, unlike
+/// [`Sink`] this back pressure is optional, allowing integration with 
synchronous producers
+///
+/// [`Sink`]: futures::sink::Sink
+#[derive(Debug)]
+pub struct WriteMultipart {
+    upload: Box<dyn MultipartUpload>,
+
+    buffer: Vec<u8>,
+
+    tasks: JoinSet<Result<()>>,
+}
+
+impl WriteMultipart {
+    /// Create a new [`WriteMultipart`]

Review Comment:
   ```suggestion
       /// Create a new [`WriteMultipart`] that will upload using 5MB chunks
   ```



##########
object_store/src/upload.rs:
##########
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{PutResult, Result};
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use tokio::task::JoinSet;
+
+/// An upload part request
+pub type UploadPart = BoxFuture<'static, Result<()>>;
+
+/// A trait allowing writing an object in fixed size chunks
+///
+/// Consecutive chunks of data can be written by calling 
[`MultipartUpload::put_part`] and polling
+/// the returned futures to completion. Multiple futures returned by 
[`MultipartUpload::put_part`]
+/// may be polled in parallel, allowing for concurrent uploads.
+///
+/// Once all part uploads have been polled to completion, the upload can be 
completed by
+/// calling [`MultipartUpload::complete`]. This will make the entire uploaded 
object visible
+/// as an atomic operation.It is implementation behind behaviour if 
[`MultipartUpload::complete`]
+/// is called before all [`UploadPart`] have been polled to completion.
+#[async_trait]
+pub trait MultipartUpload: Send + std::fmt::Debug {
+    /// Upload the next part
+    ///
+    /// Most stores require that all parts excluding the last are at least 5 
MiB, and some
+    /// further require that all parts excluding the last be the same size, 
e.g. [R2].
+    /// Clients wanting to maximise compatibility should therefore perform 
writes in
+    /// fixed size blocks larger than 5 MiB.
+    ///
+    /// Implementations may invoke this method multiple times and then await 
on the
+    /// returned futures in parallel
+    ///
+    /// ```no_run
+    /// # use futures::StreamExt;
+    /// # use object_store::MultipartUpload;
+    /// #
+    /// # async fn test() {
+    /// #
+    /// let mut upload: Box<&dyn MultipartUpload> = todo!();
+    /// let p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into());
+    /// let p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into());
+    /// futures::future::try_join(p1, p2).await.unwrap();
+    /// upload.complete().await.unwrap();
+    /// # }
+    /// ```
+    ///
+    /// [R2]: 
https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations
+    fn put_part(&mut self, data: Bytes) -> UploadPart;
+
+    /// Complete the multipart upload
+    ///
+    /// It is implementation defined behaviour if this method is called before 
polling
+    /// all [`UploadPart`] returned by [`MultipartUpload::put_part`] to 
completion. Additionally,
+    /// it is implementation defined behaviour to call 
[`MultipartUpload::complete`]
+    /// on an already completed or aborted [`MultipartUpload`].
+    async fn complete(&mut self) -> Result<PutResult>;
+
+    /// Abort the multipart upload
+    ///
+    /// If a [`MultipartUpload`] is dropped without calling 
[`MultipartUpload::complete`],
+    /// some implementations will automatically reap any uploaded parts. 
However,
+    /// this is not always possible, e.g. for S3 and GCS. 
[`MultipartUpload::abort`] can
+    /// therefore be invoked to perform this cleanup.
+    ///
+    /// It is recommended that where possible users configure lifecycle rules
+    /// to automatically reap unused parts older than some threshold, as this
+    /// will more reliably handle different failure modes. See [crate::aws] and
+    /// [crate::gcp] for more information.
+    ///
+    /// It is implementation defined behaviour to call 
[`MultipartUpload::abort`]
+    /// on an already completed or aborted [`MultipartUpload`]
+    async fn abort(&mut self) -> Result<()>;
+}
+
+/// A synchronous write API for uploading data in parallel in fixed size chunks
+///
+/// Makes use of [`JoinSet`] under the hood to multiplex upload tasks,
+/// avoiding issues caused by sharing a single tokio's cooperative task
+/// budget across multiple IO operations.
+///
+/// The design also takes inspiration from [`Sink`] with 
[`WriteMultipart::wait_for_capacity`]
+/// allowing back pressure on producers, prior to buffering the next part. 
However, unlike
+/// [`Sink`] this back pressure is optional, allowing integration with 
synchronous producers
+///
+/// [`Sink`]: futures::sink::Sink
+#[derive(Debug)]
+pub struct WriteMultipart {
+    upload: Box<dyn MultipartUpload>,
+
+    buffer: Vec<u8>,
+
+    tasks: JoinSet<Result<()>>,
+}
+
+impl WriteMultipart {
+    /// Create a new [`WriteMultipart`]
+    pub fn new(upload: Box<dyn MultipartUpload>) -> Self {
+        Self::new_with_capacity(upload, 5 * 1024 * 1024)
+    }
+
+    /// Create a new [`WriteMultipart`] that will upload in fixed `capacity` 
sized chunks
+    pub fn new_with_capacity(upload: Box<dyn MultipartUpload>, capacity: 
usize) -> Self {
+        Self {
+            upload,
+            buffer: Vec::with_capacity(capacity),
+            tasks: Default::default(),
+        }
+    }
+
+    /// Wait until there are `max_concurrency` or fewer requests in-flight
+    pub async fn wait_for_capacity(&mut self, max_concurrency: usize) -> 
Result<()> {
+        while self.tasks.len() > max_concurrency {
+            self.tasks.join_next().await.unwrap()??;
+        }
+        Ok(())
+    }
+
+    /// Write data to this [`WriteMultipart`]
+    ///
+    /// Back pressure can optionally be applied to producers by calling

Review Comment:
   I think being more explicit about the implications would help here
   
   ```suggestion
       /// Note this method is synchronous (not `async`) and will immediately 
start new uploads
       /// as soon as the internal `capacity` is hit, regardless of 
       /// how many outstanding uploads are already in progress.
       /// 
       /// Back pressure can optionally be applied to producers by calling
   ```



##########
object_store/src/local.rs:
##########
@@ -705,194 +684,91 @@ fn staged_upload_path(dest: &std::path::Path, suffix: 
&str) -> PathBuf {
     staging_path.into()
 }
 
-enum LocalUploadState {
-    /// Upload is ready to send new data
-    Idle(Arc<File>),
-    /// In the middle of a write
-    Writing(Arc<File>, BoxFuture<'static, Result<usize, io::Error>>),
-    /// In the middle of syncing data and closing file.
-    ///
-    /// Future will contain last reference to file, so it will call drop on 
completion.
-    ShuttingDown(BoxFuture<'static, Result<(), io::Error>>),
-    /// File is being moved from it's temporary location to the final location
-    Committing(BoxFuture<'static, Result<(), io::Error>>),
-    /// Upload is complete
-    Complete,
+#[derive(Debug)]
+struct LocalUpload {
+    /// The upload state
+    state: Arc<UploadState>,
+    /// The location of the temporary file
+    src: Option<PathBuf>,
+    /// The next offset to write into the file
+    offset: u64,
 }
 
-struct LocalUpload {
-    inner_state: LocalUploadState,
+#[derive(Debug)]
+struct UploadState {
     dest: PathBuf,
-    multipart_id: MultipartId,
+    file: Mutex<Option<File>>,
 }
 
 impl LocalUpload {
-    pub fn new(dest: PathBuf, multipart_id: MultipartId, file: Arc<File>) -> 
Self {
+    pub fn new(src: PathBuf, dest: PathBuf, file: File) -> Self {
         Self {
-            inner_state: LocalUploadState::Idle(file),
-            dest,
-            multipart_id,
+            state: Arc::new(UploadState {
+                dest,
+                file: Mutex::new(Some(file)),
+            }),
+            src: Some(src),
+            offset: 0,
         }
     }
 }
 
-impl AsyncWrite for LocalUpload {
-    fn poll_write(
-        mut self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-        buf: &[u8],
-    ) -> Poll<Result<usize, io::Error>> {
-        let invalid_state = |condition: &str| -> Poll<Result<usize, 
io::Error>> {
-            Poll::Ready(Err(io::Error::new(
-                ErrorKind::InvalidInput,
-                format!("Tried to write to file {condition}."),
-            )))
-        };
+#[async_trait]
+impl MultipartUpload for LocalUpload {
+    fn put_part(&mut self, data: Bytes) -> UploadPart {
+        let offset = self.offset;
+        self.offset += data.len() as u64;
 
-        if let Ok(runtime) = tokio::runtime::Handle::try_current() {
-            let mut data: Vec<u8> = buf.to_vec();
-            let data_len = data.len();
-
-            loop {
-                match &mut self.inner_state {
-                    LocalUploadState::Idle(file) => {
-                        let file = Arc::clone(file);
-                        let file2 = Arc::clone(&file);
-                        let data: Vec<u8> = std::mem::take(&mut data);
-                        self.inner_state = LocalUploadState::Writing(
-                            file,
-                            Box::pin(
-                                runtime
-                                    .spawn_blocking(move || 
(&*file2).write_all(&data))
-                                    .map(move |res| match res {
-                                        Err(err) => 
Err(io::Error::new(ErrorKind::Other, err)),
-                                        Ok(res) => res.map(move |_| data_len),
-                                    }),
-                            ),
-                        );
-                    }
-                    LocalUploadState::Writing(file, inner_write) => {
-                        let res = ready!(inner_write.poll_unpin(cx));
-                        self.inner_state = 
LocalUploadState::Idle(Arc::clone(file));
-                        return Poll::Ready(res);
-                    }
-                    LocalUploadState::ShuttingDown(_) => {
-                        return invalid_state("when writer is shutting down");
-                    }
-                    LocalUploadState::Committing(_) => {
-                        return invalid_state("when writer is committing data");
-                    }
-                    LocalUploadState::Complete => {
-                        return invalid_state("when writer is complete");
-                    }
-                }
-            }
-        } else if let LocalUploadState::Idle(file) = &self.inner_state {
-            let file = Arc::clone(file);
-            (&*file).write_all(buf)?;
-            Poll::Ready(Ok(buf.len()))
-        } else {
-            // If we are running on this thread, then only possible states are 
Idle and Complete.
-            invalid_state("when writer is already complete.")
-        }
+        let s = Arc::clone(&self.state);
+        maybe_spawn_blocking(move || {

Review Comment:
   nice



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to