This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new dd70b3c706 feat: Implement and refactor concurrent tasks for multipart 
write (#4653)
dd70b3c706 is described below

commit dd70b3c706f1689c2fe9455405cbd1e773c483ab
Author: Xuanwo <[email protected]>
AuthorDate: Wed May 29 01:43:57 2024 +0800

    feat: Implement and refactor concurrent tasks for multipart write (#4653)
---
 core/Cargo.toml                                    |   2 +-
 core/benches/types/concurrent_tasks.rs             |  49 +++++
 core/benches/types/main.rs                         |   2 +
 core/src/raw/futures_util.rs                       | 152 ++++++++++++++-
 core/src/raw/mod.rs                                |   1 +
 core/src/raw/oio/write/multipart_write.rs          | 216 ++++++++-------------
 core/src/services/aliyun_drive/backend.rs          |   4 +-
 core/src/services/b2/backend.rs                    |   3 +-
 core/src/services/cos/backend.rs                   |   6 +-
 core/src/services/obs/backend.rs                   |   6 +-
 core/src/services/oss/backend.rs                   |   6 +-
 core/src/services/s3/backend.rs                    |   3 +-
 core/src/services/upyun/backend.rs                 |   3 +-
 core/src/services/vercel_blob/backend.rs           |   3 +-
 core/src/types/execute/api.rs                      |  12 +-
 core/src/types/execute/executor.rs                 |   7 +-
 core/src/types/execute/executors/tokio_executor.rs |  13 +-
 17 files changed, 326 insertions(+), 162 deletions(-)

diff --git a/core/Cargo.toml b/core/Cargo.toml
index a2e52b6a36..32fe17522f 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -232,7 +232,7 @@ chrono = { version = "0.4.28", default-features = false, 
features = [
     "std",
 ] }
 flagset = "0.4"
-futures = { version = "0.3", default-features = false, features = ["std"] }
+futures = { version = "0.3", default-features = false, features = ["std", 
"async-await"] }
 http = "1.1"
 log = "0.4"
 md-5 = "0.10"
diff --git a/core/benches/types/concurrent_tasks.rs 
b/core/benches/types/concurrent_tasks.rs
new file mode 100644
index 0000000000..0ee9ba4729
--- /dev/null
+++ b/core/benches/types/concurrent_tasks.rs
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use criterion::{black_box, BatchSize, Criterion};
+use once_cell::sync::Lazy;
+use opendal::raw::ConcurrentTasks;
+use opendal::Executor;
+
+pub static TOKIO: Lazy<tokio::runtime::Runtime> =
+    Lazy::new(|| tokio::runtime::Runtime::new().expect("build tokio runtime"));
+
+pub fn bench_concurrent_tasks(c: &mut Criterion) {
+    let mut group = c.benchmark_group("bench_concurrent_tasks");
+
+    for concurrent in [1, 2, 4, 8, 16] {
+        group.bench_with_input(concurrent.to_string(), &concurrent, |b, 
concurrent| {
+            b.to_async(&*TOKIO).iter_batched(
+                || {
+                    ConcurrentTasks::new(Some(Executor::new()), *concurrent, 
|()| {
+                        Box::pin(async {
+                            black_box(());
+                            ((), Ok(()))
+                        })
+                    })
+                },
+                |mut tasks| async move {
+                    let _ = tasks.execute(()).await;
+                },
+                BatchSize::NumIterations(1000),
+            )
+        });
+    }
+
+    group.finish()
+}
diff --git a/core/benches/types/main.rs b/core/benches/types/main.rs
index 4e01ad0941..801ad814f7 100644
--- a/core/benches/types/main.rs
+++ b/core/benches/types/main.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 mod buffer;
+mod concurrent_tasks;
 mod utils;
 
 use criterion::criterion_group;
@@ -25,5 +26,6 @@ criterion_group!(
     benches,
     buffer::bench_non_contiguous_buffer,
     buffer::bench_non_contiguous_buffer_with_extreme,
+    concurrent_tasks::bench_concurrent_tasks,
 );
 criterion_main!(benches);
diff --git a/core/src/raw/futures_util.rs b/core/src/raw/futures_util.rs
index 850b137d7e..0010c2946c 100644
--- a/core/src/raw/futures_util.rs
+++ b/core/src/raw/futures_util.rs
@@ -21,9 +21,10 @@ use std::pin::Pin;
 use std::task::Context;
 use std::task::Poll;
 
+use crate::*;
 use futures::stream::FuturesOrdered;
-use futures::FutureExt;
 use futures::StreamExt;
+use futures::{poll, FutureExt};
 
 /// BoxedFuture is the type alias of [`futures::future::BoxFuture`].
 ///
@@ -58,6 +59,155 @@ unsafe impl<T: Send> MaybeSend for T {}
 #[cfg(target_arch = "wasm32")]
 unsafe impl<T> MaybeSend for T {}
 
+/// ConcurrentTasks is used to execute tasks concurrently.
+///
+/// ConcurrentTasks has two generic types:
+///
+/// - `I` represents the input type of the task.
+/// - `O` represents the output type of the task.
+pub struct ConcurrentTasks<I, O> {
+    /// The executor to execute the tasks.
+    ///
+    /// If user doesn't provide an executor, the tasks will be executed with 
the default executor.
+    executor: Executor,
+    /// The factory to create the task.
+    ///
+    /// Caller of ConcurrentTasks must provides a factory to create the task 
for executing.
+    ///
+    /// The factory must accept an input and return a future that resolves to 
a tuple of input and
+    /// output result. If the given result is error, the error will be 
returned to users and the
+    /// task will be retried.
+    factory: fn(I) -> BoxedStaticFuture<(I, Result<O>)>,
+
+    /// `tasks` holds the ongoing tasks.
+    ///
+    /// Please keep in mind that all tasks are running in the background by 
`Executor`. We only need
+    /// to poll the tasks to see if they are ready.
+    ///
+    /// Dropping task without `await` it will cancel the task.
+    tasks: VecDeque<Task<(I, Result<O>)>>,
+    /// `results` stores the successful results.
+    results: VecDeque<O>,
+}
+
+impl<I: Send + 'static, O: Send + 'static> ConcurrentTasks<I, O> {
+    /// Create a new concurrent tasks with given executor, concurrent and 
factory.
+    ///
+    /// The factory is a function pointer that shouldn't capture any context.
+    pub fn new(
+        executor: Option<Executor>,
+        concurrent: usize,
+        factory: fn(I) -> BoxedStaticFuture<(I, Result<O>)>,
+    ) -> Self {
+        Self {
+            executor: executor.unwrap_or_default(),
+            factory,
+
+            tasks: VecDeque::with_capacity(concurrent),
+            results: VecDeque::with_capacity(concurrent),
+        }
+    }
+
+    /// Return true if the tasks are running concurrently.
+    #[inline]
+    fn is_concurrent(&self) -> bool {
+        self.tasks.capacity() > 1
+    }
+
+    /// Clear all tasks and results.
+    ///
+    /// All ongoing tasks will be canceled.
+    pub fn clear(&mut self) {
+        self.tasks.clear();
+        self.results.clear();
+    }
+
+    /// Execute the task with given input.
+    ///
+    /// - Execute the task in the current thread if is not concurrent.
+    /// - Execute the task in the background if there are available slots.
+    /// - Await the first task in the queue if there is no available slots.
+    pub async fn execute(&mut self, input: I) -> Result<()> {
+        // Short path for non-concurrent case.
+        if !self.is_concurrent() {
+            let (_, o) = (self.factory)(input).await;
+            return match o {
+                Ok(o) => {
+                    self.results.push_back(o);
+                    Ok(())
+                }
+                // We don't need to rebuild the future if it's not concurrent.
+                Err(err) => Err(err),
+            };
+        }
+
+        loop {
+            // Try poll once to see if there is any ready task.
+            if let Some(mut task) = self.tasks.pop_front() {
+                if let Poll::Ready((i, o)) = poll!(&mut task) {
+                    match o {
+                        Ok(o) => self.results.push_back(o),
+                        Err(err) => {
+                            self.tasks
+                                
.push_front(self.executor.execute((self.factory)(i)));
+                            return Err(err);
+                        }
+                    }
+                } else {
+                    // task is not ready, push it back.
+                    self.tasks.push_front(task)
+                }
+            }
+
+            // Try to push new task if there are available space.
+            if self.tasks.len() < self.tasks.capacity() {
+                self.tasks
+                    .push_back(self.executor.execute((self.factory)(input)));
+                return Ok(());
+            }
+
+            // Wait for the next task to be ready.
+            let task = self
+                .tasks
+                .pop_front()
+                .expect("tasks must have at least one task");
+            let (i, o) = task.await;
+            match o {
+                Ok(o) => {
+                    self.results.push_back(o);
+                    continue;
+                }
+                Err(err) => {
+                    self.tasks
+                        .push_front(self.executor.execute((self.factory)(i)));
+                    return Err(err);
+                }
+            }
+        }
+    }
+
+    /// Fetch the successful result from the result queue.
+    pub async fn next(&mut self) -> Option<Result<O>> {
+        if let Some(result) = self.results.pop_front() {
+            return Some(Ok(result));
+        }
+
+        if let Some(task) = self.tasks.pop_front() {
+            let (i, o) = task.await;
+            match o {
+                Ok(o) => return Some(Ok(o)),
+                Err(err) => {
+                    self.tasks
+                        .push_front(self.executor.execute((self.factory)(i)));
+                    return Some(Err(err));
+                }
+            }
+        }
+
+        None
+    }
+}
+
 /// CONCURRENT_LARGE_THRESHOLD is the threshold to determine whether to use
 /// [`FuturesOrdered`] or not.
 ///
diff --git a/core/src/raw/mod.rs b/core/src/raw/mod.rs
index fc2965e377..b4d6b3ca96 100644
--- a/core/src/raw/mod.rs
+++ b/core/src/raw/mod.rs
@@ -73,6 +73,7 @@ mod futures_util;
 pub use futures_util::BoxedFuture;
 pub use futures_util::BoxedStaticFuture;
 pub use futures_util::ConcurrentFutures;
+pub use futures_util::ConcurrentTasks;
 pub use futures_util::MaybeSend;
 
 mod enum_utils;
diff --git a/core/src/raw/oio/write/multipart_write.rs 
b/core/src/raw/oio/write/multipart_write.rs
index 0582553246..77179a20ce 100644
--- a/core/src/raw/oio/write/multipart_write.rs
+++ b/core/src/raw/oio/write/multipart_write.rs
@@ -15,14 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::pin::Pin;
 use std::sync::Arc;
-use std::task::Context;
-use std::task::Poll;
 
 use futures::Future;
-use futures::FutureExt;
-use futures::StreamExt;
 
 use crate::raw::*;
 use crate::*;
@@ -119,45 +114,11 @@ pub struct MultipartPart {
     pub checksum: Option<String>,
 }
 
-/// WritePartResult is the result returned by [`WritePartFuture`].
-///
-/// The error part will carries input `(part_number, buffer, err)` so caller 
can retry them.
-type WritePartResult = std::result::Result<MultipartPart, (usize, Buffer, 
Error)>;
-
-struct WritePartFuture(BoxedStaticFuture<WritePartResult>);
-
-/// # Safety
-///
-/// wasm32 is a special target that we only have one event-loop for this 
WritePartFuture.
-unsafe impl Send for WritePartFuture {}
-
-/// # Safety
-///
-/// We will only take `&mut Self` reference for WritePartFuture.
-unsafe impl Sync for WritePartFuture {}
-
-impl Future for WritePartFuture {
-    type Output = WritePartResult;
-    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
-        self.get_mut().0.poll_unpin(cx)
-    }
-}
-
-impl WritePartFuture {
-    pub fn new<W: MultipartWrite>(
-        w: Arc<W>,
-        upload_id: Arc<String>,
-        part_number: usize,
-        bytes: Buffer,
-    ) -> Self {
-        let fut = async move {
-            w.write_part(&upload_id, part_number, bytes.len() as u64, 
bytes.clone())
-                .await
-                .map_err(|err| (part_number, bytes, err))
-        };
-
-        WritePartFuture(Box::pin(fut))
-    }
+struct WriteInput<W: MultipartWrite> {
+    w: Arc<W>,
+    upload_id: Arc<String>,
+    part_number: usize,
+    bytes: Buffer,
 }
 
 /// MultipartWriter will implements [`Write`] based on multipart
@@ -168,8 +129,9 @@ pub struct MultipartWriter<W: MultipartWrite> {
     upload_id: Option<Arc<String>>,
     parts: Vec<MultipartPart>,
     cache: Option<Buffer>,
-    futures: ConcurrentFutures<WritePartFuture>,
     next_part_number: usize,
+
+    tasks: ConcurrentTasks<WriteInput<W>, MultipartPart>,
 }
 
 /// # Safety
@@ -178,14 +140,31 @@ pub struct MultipartWriter<W: MultipartWrite> {
 
 impl<W: MultipartWrite> MultipartWriter<W> {
     /// Create a new MultipartWriter.
-    pub fn new(inner: W, concurrent: usize) -> Self {
+    pub fn new(inner: W, executor: Option<Executor>, concurrent: usize) -> 
Self {
+        let w = Arc::new(inner);
         Self {
-            w: Arc::new(inner),
+            w,
             upload_id: None,
             parts: Vec::new(),
             cache: None,
-            futures: ConcurrentFutures::new(1.max(concurrent)),
             next_part_number: 0,
+
+            tasks: ConcurrentTasks::new(executor, concurrent, |input| {
+                Box::pin({
+                    async move {
+                        let result = input
+                            .w
+                            .write_part(
+                                &input.upload_id,
+                                input.part_number,
+                                input.bytes.len() as u64,
+                                input.bytes.clone(),
+                            )
+                            .await;
+                        (input, result)
+                    }
+                })
+            }),
         }
     }
 
@@ -218,40 +197,21 @@ where
             }
         };
 
-        loop {
-            if self.futures.has_remaining() {
-                let cache = self.cache.take().expect("pending write must 
exist");
-                let part_number = self.next_part_number;
-                self.next_part_number += 1;
-
-                self.futures.push_back(WritePartFuture::new(
-                    self.w.clone(),
-                    upload_id.clone(),
-                    part_number,
-                    cache,
-                ));
-                let size = self.fill_cache(bs);
-                return Ok(size);
-            }
+        let bytes = self.cache.clone().expect("pending write must exist");
+        let part_number = self.next_part_number;
 
-            if let Some(part) = self.futures.next().await {
-                match part {
-                    Ok(part) => {
-                        self.parts.push(part);
-                        continue;
-                    }
-                    Err((part_number, bytes, err)) => {
-                        self.futures.push_front(WritePartFuture::new(
-                            self.w.clone(),
-                            upload_id.clone(),
-                            part_number,
-                            bytes,
-                        ));
-                        return Err(err);
-                    }
-                }
-            }
-        }
+        self.tasks
+            .execute(WriteInput {
+                w: self.w.clone(),
+                upload_id: upload_id.clone(),
+                part_number,
+                bytes,
+            })
+            .await?;
+        self.cache = None;
+        self.next_part_number += 1;
+        let size = self.fill_cache(bs);
+        Ok(size)
     }
 
     async fn close(&mut self) -> Result<()> {
@@ -269,45 +229,29 @@ where
             }
         };
 
-        loop {
-            // futures queue is empty and cache is consumed, we can complete 
the upload.
-            if self.futures.is_empty() && self.cache.is_none() {
-                return self.w.complete_part(&upload_id, &self.parts).await;
-            }
+        if let Some(cache) = self.cache.clone() {
+            let part_number = self.next_part_number;
 
-            if self.futures.has_remaining() {
-                // This must be the final task.
-                if let Some(cache) = self.cache.take() {
-                    let part_number = self.next_part_number;
-                    self.next_part_number += 1;
-
-                    self.futures.push_back(WritePartFuture::new(
-                        self.w.clone(),
-                        upload_id.clone(),
-                        part_number,
-                        cache,
-                    ));
-                }
-            }
+            self.tasks
+                .execute(WriteInput {
+                    w: self.w.clone(),
+                    upload_id: upload_id.clone(),
+                    part_number,
+                    bytes: cache,
+                })
+                .await?;
+            self.cache = None;
+            self.next_part_number += 1;
+        }
 
-            if let Some(part) = self.futures.next().await {
-                match part {
-                    Ok(part) => {
-                        self.parts.push(part);
-                        continue;
-                    }
-                    Err((part_number, bytes, err)) => {
-                        self.futures.push_front(WritePartFuture::new(
-                            self.w.clone(),
-                            upload_id.clone(),
-                            part_number,
-                            bytes,
-                        ));
-                        return Err(err);
-                    }
-                }
-            }
+        loop {
+            let Some(result) = self.tasks.next().await.transpose()? else {
+                break;
+            };
+            self.parts.push(result)
         }
+
+        self.w.complete_part(&upload_id, &self.parts).await
     }
 
     async fn abort(&mut self) -> Result<()> {
@@ -315,7 +259,7 @@ where
             return Ok(());
         };
 
-        self.futures.clear();
+        self.tasks.clear();
         self.w.abort_part(&upload_id).await?;
         self.cache = None;
         Ok(())
@@ -324,12 +268,14 @@ where
 
 #[cfg(test)]
 mod tests {
-    use std::sync::Mutex;
+    use std::time::Duration;
 
     use pretty_assertions::assert_eq;
     use rand::thread_rng;
     use rand::Rng;
     use rand::RngCore;
+    use tokio::sync::Mutex;
+    use tokio::time::sleep;
 
     use super::*;
     use crate::raw::oio::Write;
@@ -354,12 +300,12 @@ mod tests {
 
     impl MultipartWrite for Arc<Mutex<TestWrite>> {
         async fn write_once(&self, size: u64, _: Buffer) -> Result<()> {
-            self.lock().unwrap().length += size;
+            self.lock().await.length += size;
             Ok(())
         }
 
         async fn initiate_part(&self) -> Result<String> {
-            let upload_id = self.lock().unwrap().upload_id.clone();
+            let upload_id = self.lock().await.upload_id.clone();
             Ok(upload_id)
         }
 
@@ -370,16 +316,24 @@ mod tests {
             size: u64,
             _: Buffer,
         ) -> Result<MultipartPart> {
-            let mut test = self.lock().unwrap();
-            assert_eq!(upload_id, test.upload_id);
+            {
+                let test = self.lock().await;
+                assert_eq!(upload_id, test.upload_id);
+            }
+
+            // Add an async sleep here to enforce some pending.
+            sleep(Duration::from_millis(50)).await;
 
-            // We will have 50% percent rate for write part to fail.
-            if thread_rng().gen_bool(5.0 / 10.0) {
+            // We will have 10% percent rate for write part to fail.
+            if thread_rng().gen_bool(1.0 / 10.0) {
                 return Err(Error::new(ErrorKind::Unexpected, "I'm a crazy 
monkey!"));
             }
 
-            test.part_numbers.push(part_number);
-            test.length += size;
+            {
+                let mut test = self.lock().await;
+                test.part_numbers.push(part_number);
+                test.length += size;
+            }
 
             Ok(MultipartPart {
                 part_number,
@@ -389,7 +343,7 @@ mod tests {
         }
 
         async fn complete_part(&self, upload_id: &str, parts: 
&[MultipartPart]) -> Result<()> {
-            let test = self.lock().unwrap();
+            let test = self.lock().await;
             assert_eq!(upload_id, test.upload_id);
             assert_eq!(parts.len(), test.part_numbers.len());
 
@@ -397,7 +351,7 @@ mod tests {
         }
 
         async fn abort_part(&self, upload_id: &str) -> Result<()> {
-            let test = self.lock().unwrap();
+            let test = self.lock().await;
             assert_eq!(upload_id, test.upload_id);
 
             Ok(())
@@ -408,7 +362,7 @@ mod tests {
     async fn test_multipart_upload_writer_with_concurrent_errors() {
         let mut rng = thread_rng();
 
-        let mut w = MultipartWriter::new(TestWrite::new(), 8);
+        let mut w = MultipartWriter::new(TestWrite::new(), 
Some(Executor::new()), 200);
         let mut total_size = 0u64;
 
         for _ in 0..1000 {
@@ -437,7 +391,7 @@ mod tests {
         let expected_parts: Vec<_> = (0..1000).collect();
         assert_eq!(actual_parts, expected_parts);
 
-        let actual_size = w.w.lock().unwrap().length;
+        let actual_size = w.w.lock().await.length;
         assert_eq!(actual_size, total_size);
     }
 }
diff --git a/core/src/services/aliyun_drive/backend.rs 
b/core/src/services/aliyun_drive/backend.rs
index 79eb7169d9..47598e7190 100644
--- a/core/src/services/aliyun_drive/backend.rs
+++ b/core/src/services/aliyun_drive/backend.rs
@@ -461,10 +461,12 @@ impl Access for AliyunDriveBackend {
         let parent_path = get_parent(path);
         let parent_file_id = self.core.ensure_dir_exists(parent_path).await?;
 
+        let executor = args.executor().cloned();
+
         let writer =
             AliyunDriveWriter::new(self.core.clone(), &parent_file_id, 
get_basename(path), args);
 
-        let w = oio::MultipartWriter::new(writer, 1);
+        let w = oio::MultipartWriter::new(writer, executor, 1);
 
         Ok((RpWrite::default(), w))
     }
diff --git a/core/src/services/b2/backend.rs b/core/src/services/b2/backend.rs
index 2a713e3057..2c99176b32 100644
--- a/core/src/services/b2/backend.rs
+++ b/core/src/services/b2/backend.rs
@@ -360,9 +360,10 @@ impl Access for B2Backend {
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         let concurrent = args.concurrent();
+        let executor = args.executor().cloned();
         let writer = B2Writer::new(self.core.clone(), path, args);
 
-        let w = oio::MultipartWriter::new(writer, concurrent);
+        let w = oio::MultipartWriter::new(writer, executor, concurrent);
 
         Ok((RpWrite::default(), w))
     }
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index a4e9cfed09..4347a46407 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -336,7 +336,11 @@ impl Access for CosBackend {
         let w = if args.append() {
             CosWriters::Two(oio::AppendWriter::new(writer))
         } else {
-            CosWriters::One(oio::MultipartWriter::new(writer, 
args.concurrent()))
+            CosWriters::One(oio::MultipartWriter::new(
+                writer,
+                args.executor().cloned(),
+                args.concurrent(),
+            ))
         };
 
         Ok((RpWrite::default(), w))
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index 8f81509706..efeced3436 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -333,7 +333,11 @@ impl Access for ObsBackend {
         let w = if args.append() {
             ObsWriters::Two(oio::AppendWriter::new(writer))
         } else {
-            ObsWriters::One(oio::MultipartWriter::new(writer, 
args.concurrent()))
+            ObsWriters::One(oio::MultipartWriter::new(
+                writer,
+                args.executor().cloned(),
+                args.concurrent(),
+            ))
         };
 
         Ok((RpWrite::default(), w))
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 9780b5ec70..fc57fabcf4 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -466,7 +466,11 @@ impl Access for OssBackend {
         let w = if args.append() {
             OssWriters::Two(oio::AppendWriter::new(writer))
         } else {
-            OssWriters::One(oio::MultipartWriter::new(writer, 
args.concurrent()))
+            OssWriters::One(oio::MultipartWriter::new(
+                writer,
+                args.executor().cloned(),
+                args.concurrent(),
+            ))
         };
 
         Ok((RpWrite::default(), w))
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index c990413c1e..2f3a49b1f2 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -1107,9 +1107,10 @@ impl Access for S3Backend {
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         let concurrent = args.concurrent();
+        let executor = args.executor().cloned();
         let writer = S3Writer::new(self.core.clone(), path, args);
 
-        let w = oio::MultipartWriter::new(writer, concurrent);
+        let w = oio::MultipartWriter::new(writer, executor, concurrent);
 
         Ok((RpWrite::default(), w))
     }
diff --git a/core/src/services/upyun/backend.rs 
b/core/src/services/upyun/backend.rs
index bc3342917e..4603964665 100644
--- a/core/src/services/upyun/backend.rs
+++ b/core/src/services/upyun/backend.rs
@@ -304,9 +304,10 @@ impl Access for UpyunBackend {
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         let concurrent = args.concurrent();
+        let executor = args.executor().cloned();
         let writer = UpyunWriter::new(self.core.clone(), args, 
path.to_string());
 
-        let w = oio::MultipartWriter::new(writer, concurrent);
+        let w = oio::MultipartWriter::new(writer, executor, concurrent);
 
         Ok((RpWrite::default(), w))
     }
diff --git a/core/src/services/vercel_blob/backend.rs 
b/core/src/services/vercel_blob/backend.rs
index c4857a5272..51489a702d 100644
--- a/core/src/services/vercel_blob/backend.rs
+++ b/core/src/services/vercel_blob/backend.rs
@@ -238,9 +238,10 @@ impl Access for VercelBlobBackend {
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         let concurrent = args.concurrent();
+        let executor = args.executor().cloned();
         let writer = VercelBlobWriter::new(self.core.clone(), args, 
path.to_string());
 
-        let w = oio::MultipartWriter::new(writer, concurrent);
+        let w = oio::MultipartWriter::new(writer, executor, concurrent);
 
         Ok((RpWrite::default(), w))
     }
diff --git a/core/src/types/execute/api.rs b/core/src/types/execute/api.rs
index 8307d87aae..95844c7f30 100644
--- a/core/src/types/execute/api.rs
+++ b/core/src/types/execute/api.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use crate::raw::BoxedStaticFuture;
-use crate::*;
 use futures::future::RemoteHandle;
 use futures::FutureExt;
 use std::future::Future;
@@ -30,16 +29,12 @@ pub trait Execute: Send + Sync + 'static {
     /// # Behavior
     ///
     /// - Implementor must manage the executing futures and keep making 
progress.
-    /// - Implementor must return `Error::Unexpected` if failed to execute new 
task.
-    fn execute(&self, f: BoxedStaticFuture<()>) -> Result<()>;
+    fn execute(&self, f: BoxedStaticFuture<()>);
 }
 
 impl Execute for () {
-    fn execute(&self, _: BoxedStaticFuture<()>) -> Result<()> {
-        Err(Error::new(
-            ErrorKind::Unexpected,
-            "no executor has been set",
-        ))
+    fn execute(&self, _: BoxedStaticFuture<()>) {
+        panic!("concurrent tasks executed with no executor has been enabled")
     }
 }
 
@@ -59,7 +54,6 @@ pub struct Task<T> {
 impl<T: 'static> Task<T> {
     /// Create a new task.
     #[inline]
-    #[allow(unused)]
     pub fn new(handle: RemoteHandle<T>) -> Self {
         Self { handle }
     }
diff --git a/core/src/types/execute/executor.rs 
b/core/src/types/execute/executor.rs
index d516e38aa8..47e0e5dbd9 100644
--- a/core/src/types/execute/executor.rs
+++ b/core/src/types/execute/executor.rs
@@ -17,7 +17,6 @@
 
 use super::*;
 use crate::raw::MaybeSend;
-use crate::*;
 use futures::FutureExt;
 use std::fmt::{Debug, Formatter};
 use std::future::Future;
@@ -72,13 +71,13 @@ impl Executor {
 
     /// Run given future in background immediately.
     #[allow(unused)]
-    pub(crate) fn execute<F>(&self, f: F) -> Result<Task<F::Output>>
+    pub(crate) fn execute<F>(&self, f: F) -> Task<F::Output>
     where
         F: Future + MaybeSend + 'static,
         F::Output: MaybeSend + 'static,
     {
         let (fut, handle) = f.remote_handle();
-        self.executor.execute(Box::pin(fut))?;
-        Ok(Task::new(handle))
+        self.executor.execute(Box::pin(fut));
+        Task::new(handle)
     }
 }
diff --git a/core/src/types/execute/executors/tokio_executor.rs 
b/core/src/types/execute/executors/tokio_executor.rs
index 73ecd87ec3..f6e0eb8373 100644
--- a/core/src/types/execute/executors/tokio_executor.rs
+++ b/core/src/types/execute/executors/tokio_executor.rs
@@ -24,9 +24,8 @@ pub struct TokioExecutor {}
 
 impl Execute for TokioExecutor {
     /// Tokio's JoinHandle has it's own `abort` support, so dropping handle 
won't cancel the task.
-    fn execute(&self, f: BoxedStaticFuture<()>) -> Result<()> {
+    fn execute(&self, f: BoxedStaticFuture<()>) {
         let _handle = tokio::task::spawn(f);
-        Ok(())
     }
 }
 
@@ -46,12 +45,10 @@ mod tests {
         let finished = Arc::new(AtomicBool::new(false));
 
         let finished_clone = finished.clone();
-        let _task = executor
-            .execute(async move {
-                sleep(Duration::from_secs(1)).await;
-                finished_clone.store(true, Ordering::Relaxed);
-            })
-            .unwrap();
+        let _task = executor.execute(async move {
+            sleep(Duration::from_secs(1)).await;
+            finished_clone.store(true, Ordering::Relaxed);
+        });
 
         sleep(Duration::from_secs(2)).await;
         // Task must has been finished even without await task.

Reply via email to