This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new dd70b3c706 feat: Implement and refactor concurrent tasks for multipart
write (#4653)
dd70b3c706 is described below
commit dd70b3c706f1689c2fe9455405cbd1e773c483ab
Author: Xuanwo <[email protected]>
AuthorDate: Wed May 29 01:43:57 2024 +0800
feat: Implement and refactor concurrent tasks for multipart write (#4653)
---
core/Cargo.toml | 2 +-
core/benches/types/concurrent_tasks.rs | 49 +++++
core/benches/types/main.rs | 2 +
core/src/raw/futures_util.rs | 152 ++++++++++++++-
core/src/raw/mod.rs | 1 +
core/src/raw/oio/write/multipart_write.rs | 216 ++++++++-------------
core/src/services/aliyun_drive/backend.rs | 4 +-
core/src/services/b2/backend.rs | 3 +-
core/src/services/cos/backend.rs | 6 +-
core/src/services/obs/backend.rs | 6 +-
core/src/services/oss/backend.rs | 6 +-
core/src/services/s3/backend.rs | 3 +-
core/src/services/upyun/backend.rs | 3 +-
core/src/services/vercel_blob/backend.rs | 3 +-
core/src/types/execute/api.rs | 12 +-
core/src/types/execute/executor.rs | 7 +-
core/src/types/execute/executors/tokio_executor.rs | 13 +-
17 files changed, 326 insertions(+), 162 deletions(-)
diff --git a/core/Cargo.toml b/core/Cargo.toml
index a2e52b6a36..32fe17522f 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -232,7 +232,7 @@ chrono = { version = "0.4.28", default-features = false,
features = [
"std",
] }
flagset = "0.4"
-futures = { version = "0.3", default-features = false, features = ["std"] }
+futures = { version = "0.3", default-features = false, features = ["std",
"async-await"] }
http = "1.1"
log = "0.4"
md-5 = "0.10"
diff --git a/core/benches/types/concurrent_tasks.rs
b/core/benches/types/concurrent_tasks.rs
new file mode 100644
index 0000000000..0ee9ba4729
--- /dev/null
+++ b/core/benches/types/concurrent_tasks.rs
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use criterion::{black_box, BatchSize, Criterion};
+use once_cell::sync::Lazy;
+use opendal::raw::ConcurrentTasks;
+use opendal::Executor;
+
+pub static TOKIO: Lazy<tokio::runtime::Runtime> =
+ Lazy::new(|| tokio::runtime::Runtime::new().expect("build tokio runtime"));
+
+pub fn bench_concurrent_tasks(c: &mut Criterion) {
+ let mut group = c.benchmark_group("bench_concurrent_tasks");
+
+ for concurrent in [1, 2, 4, 8, 16] {
+ group.bench_with_input(concurrent.to_string(), &concurrent, |b,
concurrent| {
+ b.to_async(&*TOKIO).iter_batched(
+ || {
+ ConcurrentTasks::new(Some(Executor::new()), *concurrent,
|()| {
+ Box::pin(async {
+ black_box(());
+ ((), Ok(()))
+ })
+ })
+ },
+ |mut tasks| async move {
+ let _ = tasks.execute(()).await;
+ },
+ BatchSize::NumIterations(1000),
+ )
+ });
+ }
+
+ group.finish()
+}
diff --git a/core/benches/types/main.rs b/core/benches/types/main.rs
index 4e01ad0941..801ad814f7 100644
--- a/core/benches/types/main.rs
+++ b/core/benches/types/main.rs
@@ -16,6 +16,7 @@
// under the License.
mod buffer;
+mod concurrent_tasks;
mod utils;
use criterion::criterion_group;
@@ -25,5 +26,6 @@ criterion_group!(
benches,
buffer::bench_non_contiguous_buffer,
buffer::bench_non_contiguous_buffer_with_extreme,
+ concurrent_tasks::bench_concurrent_tasks,
);
criterion_main!(benches);
diff --git a/core/src/raw/futures_util.rs b/core/src/raw/futures_util.rs
index 850b137d7e..0010c2946c 100644
--- a/core/src/raw/futures_util.rs
+++ b/core/src/raw/futures_util.rs
@@ -21,9 +21,10 @@ use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
+use crate::*;
use futures::stream::FuturesOrdered;
-use futures::FutureExt;
use futures::StreamExt;
+use futures::{poll, FutureExt};
/// BoxedFuture is the type alias of [`futures::future::BoxFuture`].
///
@@ -58,6 +59,155 @@ unsafe impl<T: Send> MaybeSend for T {}
#[cfg(target_arch = "wasm32")]
unsafe impl<T> MaybeSend for T {}
+/// ConcurrentTasks is used to execute tasks concurrently.
+///
+/// ConcurrentTasks has two generic types:
+///
+/// - `I` represents the input type of the task.
+/// - `O` represents the output type of the task.
+pub struct ConcurrentTasks<I, O> {
+ /// The executor to execute the tasks.
+ ///
+ /// If user doesn't provide an executor, the tasks will be executed with
the default executor.
+ executor: Executor,
+ /// The factory to create the task.
+ ///
+ /// Caller of ConcurrentTasks must provides a factory to create the task
for executing.
+ ///
+ /// The factory must accept an input and return a future that resolves to
a tuple of input and
+ /// output result. If the given result is error, the error will be
returned to users and the
+ /// task will be retried.
+ factory: fn(I) -> BoxedStaticFuture<(I, Result<O>)>,
+
+ /// `tasks` holds the ongoing tasks.
+ ///
+ /// Please keep in mind that all tasks are running in the background by
`Executor`. We only need
+ /// to poll the tasks to see if they are ready.
+ ///
+ /// Dropping task without `await` it will cancel the task.
+ tasks: VecDeque<Task<(I, Result<O>)>>,
+ /// `results` stores the successful results.
+ results: VecDeque<O>,
+}
+
+impl<I: Send + 'static, O: Send + 'static> ConcurrentTasks<I, O> {
+ /// Create a new concurrent tasks with given executor, concurrent and
factory.
+ ///
+ /// The factory is a function pointer that shouldn't capture any context.
+ pub fn new(
+ executor: Option<Executor>,
+ concurrent: usize,
+ factory: fn(I) -> BoxedStaticFuture<(I, Result<O>)>,
+ ) -> Self {
+ Self {
+ executor: executor.unwrap_or_default(),
+ factory,
+
+ tasks: VecDeque::with_capacity(concurrent),
+ results: VecDeque::with_capacity(concurrent),
+ }
+ }
+
+ /// Return true if the tasks are running concurrently.
+ #[inline]
+ fn is_concurrent(&self) -> bool {
+ self.tasks.capacity() > 1
+ }
+
+ /// Clear all tasks and results.
+ ///
+ /// All ongoing tasks will be canceled.
+ pub fn clear(&mut self) {
+ self.tasks.clear();
+ self.results.clear();
+ }
+
+ /// Execute the task with given input.
+ ///
+ /// - Execute the task in the current thread if is not concurrent.
+ /// - Execute the task in the background if there are available slots.
+ /// - Await the first task in the queue if there is no available slots.
+ pub async fn execute(&mut self, input: I) -> Result<()> {
+ // Short path for non-concurrent case.
+ if !self.is_concurrent() {
+ let (_, o) = (self.factory)(input).await;
+ return match o {
+ Ok(o) => {
+ self.results.push_back(o);
+ Ok(())
+ }
+ // We don't need to rebuild the future if it's not concurrent.
+ Err(err) => Err(err),
+ };
+ }
+
+ loop {
+ // Try poll once to see if there is any ready task.
+ if let Some(mut task) = self.tasks.pop_front() {
+ if let Poll::Ready((i, o)) = poll!(&mut task) {
+ match o {
+ Ok(o) => self.results.push_back(o),
+ Err(err) => {
+ self.tasks
+
.push_front(self.executor.execute((self.factory)(i)));
+ return Err(err);
+ }
+ }
+ } else {
+ // task is not ready, push it back.
+ self.tasks.push_front(task)
+ }
+ }
+
+ // Try to push new task if there are available space.
+ if self.tasks.len() < self.tasks.capacity() {
+ self.tasks
+ .push_back(self.executor.execute((self.factory)(input)));
+ return Ok(());
+ }
+
+ // Wait for the next task to be ready.
+ let task = self
+ .tasks
+ .pop_front()
+ .expect("tasks must have at least one task");
+ let (i, o) = task.await;
+ match o {
+ Ok(o) => {
+ self.results.push_back(o);
+ continue;
+ }
+ Err(err) => {
+ self.tasks
+ .push_front(self.executor.execute((self.factory)(i)));
+ return Err(err);
+ }
+ }
+ }
+ }
+
+ /// Fetch the successful result from the result queue.
+ pub async fn next(&mut self) -> Option<Result<O>> {
+ if let Some(result) = self.results.pop_front() {
+ return Some(Ok(result));
+ }
+
+ if let Some(task) = self.tasks.pop_front() {
+ let (i, o) = task.await;
+ match o {
+ Ok(o) => return Some(Ok(o)),
+ Err(err) => {
+ self.tasks
+ .push_front(self.executor.execute((self.factory)(i)));
+ return Some(Err(err));
+ }
+ }
+ }
+
+ None
+ }
+}
+
/// CONCURRENT_LARGE_THRESHOLD is the threshold to determine whether to use
/// [`FuturesOrdered`] or not.
///
diff --git a/core/src/raw/mod.rs b/core/src/raw/mod.rs
index fc2965e377..b4d6b3ca96 100644
--- a/core/src/raw/mod.rs
+++ b/core/src/raw/mod.rs
@@ -73,6 +73,7 @@ mod futures_util;
pub use futures_util::BoxedFuture;
pub use futures_util::BoxedStaticFuture;
pub use futures_util::ConcurrentFutures;
+pub use futures_util::ConcurrentTasks;
pub use futures_util::MaybeSend;
mod enum_utils;
diff --git a/core/src/raw/oio/write/multipart_write.rs
b/core/src/raw/oio/write/multipart_write.rs
index 0582553246..77179a20ce 100644
--- a/core/src/raw/oio/write/multipart_write.rs
+++ b/core/src/raw/oio/write/multipart_write.rs
@@ -15,14 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-use std::pin::Pin;
use std::sync::Arc;
-use std::task::Context;
-use std::task::Poll;
use futures::Future;
-use futures::FutureExt;
-use futures::StreamExt;
use crate::raw::*;
use crate::*;
@@ -119,45 +114,11 @@ pub struct MultipartPart {
pub checksum: Option<String>,
}
-/// WritePartResult is the result returned by [`WritePartFuture`].
-///
-/// The error part will carries input `(part_number, buffer, err)` so caller
can retry them.
-type WritePartResult = std::result::Result<MultipartPart, (usize, Buffer,
Error)>;
-
-struct WritePartFuture(BoxedStaticFuture<WritePartResult>);
-
-/// # Safety
-///
-/// wasm32 is a special target that we only have one event-loop for this
WritePartFuture.
-unsafe impl Send for WritePartFuture {}
-
-/// # Safety
-///
-/// We will only take `&mut Self` reference for WritePartFuture.
-unsafe impl Sync for WritePartFuture {}
-
-impl Future for WritePartFuture {
- type Output = WritePartResult;
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- self.get_mut().0.poll_unpin(cx)
- }
-}
-
-impl WritePartFuture {
- pub fn new<W: MultipartWrite>(
- w: Arc<W>,
- upload_id: Arc<String>,
- part_number: usize,
- bytes: Buffer,
- ) -> Self {
- let fut = async move {
- w.write_part(&upload_id, part_number, bytes.len() as u64,
bytes.clone())
- .await
- .map_err(|err| (part_number, bytes, err))
- };
-
- WritePartFuture(Box::pin(fut))
- }
+struct WriteInput<W: MultipartWrite> {
+ w: Arc<W>,
+ upload_id: Arc<String>,
+ part_number: usize,
+ bytes: Buffer,
}
/// MultipartWriter will implements [`Write`] based on multipart
@@ -168,8 +129,9 @@ pub struct MultipartWriter<W: MultipartWrite> {
upload_id: Option<Arc<String>>,
parts: Vec<MultipartPart>,
cache: Option<Buffer>,
- futures: ConcurrentFutures<WritePartFuture>,
next_part_number: usize,
+
+ tasks: ConcurrentTasks<WriteInput<W>, MultipartPart>,
}
/// # Safety
@@ -178,14 +140,31 @@ pub struct MultipartWriter<W: MultipartWrite> {
impl<W: MultipartWrite> MultipartWriter<W> {
/// Create a new MultipartWriter.
- pub fn new(inner: W, concurrent: usize) -> Self {
+ pub fn new(inner: W, executor: Option<Executor>, concurrent: usize) ->
Self {
+ let w = Arc::new(inner);
Self {
- w: Arc::new(inner),
+ w,
upload_id: None,
parts: Vec::new(),
cache: None,
- futures: ConcurrentFutures::new(1.max(concurrent)),
next_part_number: 0,
+
+ tasks: ConcurrentTasks::new(executor, concurrent, |input| {
+ Box::pin({
+ async move {
+ let result = input
+ .w
+ .write_part(
+ &input.upload_id,
+ input.part_number,
+ input.bytes.len() as u64,
+ input.bytes.clone(),
+ )
+ .await;
+ (input, result)
+ }
+ })
+ }),
}
}
@@ -218,40 +197,21 @@ where
}
};
- loop {
- if self.futures.has_remaining() {
- let cache = self.cache.take().expect("pending write must
exist");
- let part_number = self.next_part_number;
- self.next_part_number += 1;
-
- self.futures.push_back(WritePartFuture::new(
- self.w.clone(),
- upload_id.clone(),
- part_number,
- cache,
- ));
- let size = self.fill_cache(bs);
- return Ok(size);
- }
+ let bytes = self.cache.clone().expect("pending write must exist");
+ let part_number = self.next_part_number;
- if let Some(part) = self.futures.next().await {
- match part {
- Ok(part) => {
- self.parts.push(part);
- continue;
- }
- Err((part_number, bytes, err)) => {
- self.futures.push_front(WritePartFuture::new(
- self.w.clone(),
- upload_id.clone(),
- part_number,
- bytes,
- ));
- return Err(err);
- }
- }
- }
- }
+ self.tasks
+ .execute(WriteInput {
+ w: self.w.clone(),
+ upload_id: upload_id.clone(),
+ part_number,
+ bytes,
+ })
+ .await?;
+ self.cache = None;
+ self.next_part_number += 1;
+ let size = self.fill_cache(bs);
+ Ok(size)
}
async fn close(&mut self) -> Result<()> {
@@ -269,45 +229,29 @@ where
}
};
- loop {
- // futures queue is empty and cache is consumed, we can complete
the upload.
- if self.futures.is_empty() && self.cache.is_none() {
- return self.w.complete_part(&upload_id, &self.parts).await;
- }
+ if let Some(cache) = self.cache.clone() {
+ let part_number = self.next_part_number;
- if self.futures.has_remaining() {
- // This must be the final task.
- if let Some(cache) = self.cache.take() {
- let part_number = self.next_part_number;
- self.next_part_number += 1;
-
- self.futures.push_back(WritePartFuture::new(
- self.w.clone(),
- upload_id.clone(),
- part_number,
- cache,
- ));
- }
- }
+ self.tasks
+ .execute(WriteInput {
+ w: self.w.clone(),
+ upload_id: upload_id.clone(),
+ part_number,
+ bytes: cache,
+ })
+ .await?;
+ self.cache = None;
+ self.next_part_number += 1;
+ }
- if let Some(part) = self.futures.next().await {
- match part {
- Ok(part) => {
- self.parts.push(part);
- continue;
- }
- Err((part_number, bytes, err)) => {
- self.futures.push_front(WritePartFuture::new(
- self.w.clone(),
- upload_id.clone(),
- part_number,
- bytes,
- ));
- return Err(err);
- }
- }
- }
+ loop {
+ let Some(result) = self.tasks.next().await.transpose()? else {
+ break;
+ };
+ self.parts.push(result)
}
+
+ self.w.complete_part(&upload_id, &self.parts).await
}
async fn abort(&mut self) -> Result<()> {
@@ -315,7 +259,7 @@ where
return Ok(());
};
- self.futures.clear();
+ self.tasks.clear();
self.w.abort_part(&upload_id).await?;
self.cache = None;
Ok(())
@@ -324,12 +268,14 @@ where
#[cfg(test)]
mod tests {
- use std::sync::Mutex;
+ use std::time::Duration;
use pretty_assertions::assert_eq;
use rand::thread_rng;
use rand::Rng;
use rand::RngCore;
+ use tokio::sync::Mutex;
+ use tokio::time::sleep;
use super::*;
use crate::raw::oio::Write;
@@ -354,12 +300,12 @@ mod tests {
impl MultipartWrite for Arc<Mutex<TestWrite>> {
async fn write_once(&self, size: u64, _: Buffer) -> Result<()> {
- self.lock().unwrap().length += size;
+ self.lock().await.length += size;
Ok(())
}
async fn initiate_part(&self) -> Result<String> {
- let upload_id = self.lock().unwrap().upload_id.clone();
+ let upload_id = self.lock().await.upload_id.clone();
Ok(upload_id)
}
@@ -370,16 +316,24 @@ mod tests {
size: u64,
_: Buffer,
) -> Result<MultipartPart> {
- let mut test = self.lock().unwrap();
- assert_eq!(upload_id, test.upload_id);
+ {
+ let test = self.lock().await;
+ assert_eq!(upload_id, test.upload_id);
+ }
+
+ // Add an async sleep here to enforce some pending.
+ sleep(Duration::from_millis(50)).await;
- // We will have 50% percent rate for write part to fail.
- if thread_rng().gen_bool(5.0 / 10.0) {
+ // We will have 10% percent rate for write part to fail.
+ if thread_rng().gen_bool(1.0 / 10.0) {
return Err(Error::new(ErrorKind::Unexpected, "I'm a crazy
monkey!"));
}
- test.part_numbers.push(part_number);
- test.length += size;
+ {
+ let mut test = self.lock().await;
+ test.part_numbers.push(part_number);
+ test.length += size;
+ }
Ok(MultipartPart {
part_number,
@@ -389,7 +343,7 @@ mod tests {
}
async fn complete_part(&self, upload_id: &str, parts:
&[MultipartPart]) -> Result<()> {
- let test = self.lock().unwrap();
+ let test = self.lock().await;
assert_eq!(upload_id, test.upload_id);
assert_eq!(parts.len(), test.part_numbers.len());
@@ -397,7 +351,7 @@ mod tests {
}
async fn abort_part(&self, upload_id: &str) -> Result<()> {
- let test = self.lock().unwrap();
+ let test = self.lock().await;
assert_eq!(upload_id, test.upload_id);
Ok(())
@@ -408,7 +362,7 @@ mod tests {
async fn test_multipart_upload_writer_with_concurrent_errors() {
let mut rng = thread_rng();
- let mut w = MultipartWriter::new(TestWrite::new(), 8);
+ let mut w = MultipartWriter::new(TestWrite::new(),
Some(Executor::new()), 200);
let mut total_size = 0u64;
for _ in 0..1000 {
@@ -437,7 +391,7 @@ mod tests {
let expected_parts: Vec<_> = (0..1000).collect();
assert_eq!(actual_parts, expected_parts);
- let actual_size = w.w.lock().unwrap().length;
+ let actual_size = w.w.lock().await.length;
assert_eq!(actual_size, total_size);
}
}
diff --git a/core/src/services/aliyun_drive/backend.rs
b/core/src/services/aliyun_drive/backend.rs
index 79eb7169d9..47598e7190 100644
--- a/core/src/services/aliyun_drive/backend.rs
+++ b/core/src/services/aliyun_drive/backend.rs
@@ -461,10 +461,12 @@ impl Access for AliyunDriveBackend {
let parent_path = get_parent(path);
let parent_file_id = self.core.ensure_dir_exists(parent_path).await?;
+ let executor = args.executor().cloned();
+
let writer =
AliyunDriveWriter::new(self.core.clone(), &parent_file_id,
get_basename(path), args);
- let w = oio::MultipartWriter::new(writer, 1);
+ let w = oio::MultipartWriter::new(writer, executor, 1);
Ok((RpWrite::default(), w))
}
diff --git a/core/src/services/b2/backend.rs b/core/src/services/b2/backend.rs
index 2a713e3057..2c99176b32 100644
--- a/core/src/services/b2/backend.rs
+++ b/core/src/services/b2/backend.rs
@@ -360,9 +360,10 @@ impl Access for B2Backend {
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
let concurrent = args.concurrent();
+ let executor = args.executor().cloned();
let writer = B2Writer::new(self.core.clone(), path, args);
- let w = oio::MultipartWriter::new(writer, concurrent);
+ let w = oio::MultipartWriter::new(writer, executor, concurrent);
Ok((RpWrite::default(), w))
}
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index a4e9cfed09..4347a46407 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -336,7 +336,11 @@ impl Access for CosBackend {
let w = if args.append() {
CosWriters::Two(oio::AppendWriter::new(writer))
} else {
- CosWriters::One(oio::MultipartWriter::new(writer,
args.concurrent()))
+ CosWriters::One(oio::MultipartWriter::new(
+ writer,
+ args.executor().cloned(),
+ args.concurrent(),
+ ))
};
Ok((RpWrite::default(), w))
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index 8f81509706..efeced3436 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -333,7 +333,11 @@ impl Access for ObsBackend {
let w = if args.append() {
ObsWriters::Two(oio::AppendWriter::new(writer))
} else {
- ObsWriters::One(oio::MultipartWriter::new(writer,
args.concurrent()))
+ ObsWriters::One(oio::MultipartWriter::new(
+ writer,
+ args.executor().cloned(),
+ args.concurrent(),
+ ))
};
Ok((RpWrite::default(), w))
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 9780b5ec70..fc57fabcf4 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -466,7 +466,11 @@ impl Access for OssBackend {
let w = if args.append() {
OssWriters::Two(oio::AppendWriter::new(writer))
} else {
- OssWriters::One(oio::MultipartWriter::new(writer,
args.concurrent()))
+ OssWriters::One(oio::MultipartWriter::new(
+ writer,
+ args.executor().cloned(),
+ args.concurrent(),
+ ))
};
Ok((RpWrite::default(), w))
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index c990413c1e..2f3a49b1f2 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -1107,9 +1107,10 @@ impl Access for S3Backend {
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
let concurrent = args.concurrent();
+ let executor = args.executor().cloned();
let writer = S3Writer::new(self.core.clone(), path, args);
- let w = oio::MultipartWriter::new(writer, concurrent);
+ let w = oio::MultipartWriter::new(writer, executor, concurrent);
Ok((RpWrite::default(), w))
}
diff --git a/core/src/services/upyun/backend.rs
b/core/src/services/upyun/backend.rs
index bc3342917e..4603964665 100644
--- a/core/src/services/upyun/backend.rs
+++ b/core/src/services/upyun/backend.rs
@@ -304,9 +304,10 @@ impl Access for UpyunBackend {
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
let concurrent = args.concurrent();
+ let executor = args.executor().cloned();
let writer = UpyunWriter::new(self.core.clone(), args,
path.to_string());
- let w = oio::MultipartWriter::new(writer, concurrent);
+ let w = oio::MultipartWriter::new(writer, executor, concurrent);
Ok((RpWrite::default(), w))
}
diff --git a/core/src/services/vercel_blob/backend.rs
b/core/src/services/vercel_blob/backend.rs
index c4857a5272..51489a702d 100644
--- a/core/src/services/vercel_blob/backend.rs
+++ b/core/src/services/vercel_blob/backend.rs
@@ -238,9 +238,10 @@ impl Access for VercelBlobBackend {
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
let concurrent = args.concurrent();
+ let executor = args.executor().cloned();
let writer = VercelBlobWriter::new(self.core.clone(), args,
path.to_string());
- let w = oio::MultipartWriter::new(writer, concurrent);
+ let w = oio::MultipartWriter::new(writer, executor, concurrent);
Ok((RpWrite::default(), w))
}
diff --git a/core/src/types/execute/api.rs b/core/src/types/execute/api.rs
index 8307d87aae..95844c7f30 100644
--- a/core/src/types/execute/api.rs
+++ b/core/src/types/execute/api.rs
@@ -16,7 +16,6 @@
// under the License.
use crate::raw::BoxedStaticFuture;
-use crate::*;
use futures::future::RemoteHandle;
use futures::FutureExt;
use std::future::Future;
@@ -30,16 +29,12 @@ pub trait Execute: Send + Sync + 'static {
/// # Behavior
///
/// - Implementor must manage the executing futures and keep making
progress.
- /// - Implementor must return `Error::Unexpected` if failed to execute new
task.
- fn execute(&self, f: BoxedStaticFuture<()>) -> Result<()>;
+ fn execute(&self, f: BoxedStaticFuture<()>);
}
impl Execute for () {
- fn execute(&self, _: BoxedStaticFuture<()>) -> Result<()> {
- Err(Error::new(
- ErrorKind::Unexpected,
- "no executor has been set",
- ))
+ fn execute(&self, _: BoxedStaticFuture<()>) {
+ panic!("concurrent tasks executed with no executor has been enabled")
}
}
@@ -59,7 +54,6 @@ pub struct Task<T> {
impl<T: 'static> Task<T> {
/// Create a new task.
#[inline]
- #[allow(unused)]
pub fn new(handle: RemoteHandle<T>) -> Self {
Self { handle }
}
diff --git a/core/src/types/execute/executor.rs
b/core/src/types/execute/executor.rs
index d516e38aa8..47e0e5dbd9 100644
--- a/core/src/types/execute/executor.rs
+++ b/core/src/types/execute/executor.rs
@@ -17,7 +17,6 @@
use super::*;
use crate::raw::MaybeSend;
-use crate::*;
use futures::FutureExt;
use std::fmt::{Debug, Formatter};
use std::future::Future;
@@ -72,13 +71,13 @@ impl Executor {
/// Run given future in background immediately.
#[allow(unused)]
- pub(crate) fn execute<F>(&self, f: F) -> Result<Task<F::Output>>
+ pub(crate) fn execute<F>(&self, f: F) -> Task<F::Output>
where
F: Future + MaybeSend + 'static,
F::Output: MaybeSend + 'static,
{
let (fut, handle) = f.remote_handle();
- self.executor.execute(Box::pin(fut))?;
- Ok(Task::new(handle))
+ self.executor.execute(Box::pin(fut));
+ Task::new(handle)
}
}
diff --git a/core/src/types/execute/executors/tokio_executor.rs
b/core/src/types/execute/executors/tokio_executor.rs
index 73ecd87ec3..f6e0eb8373 100644
--- a/core/src/types/execute/executors/tokio_executor.rs
+++ b/core/src/types/execute/executors/tokio_executor.rs
@@ -24,9 +24,8 @@ pub struct TokioExecutor {}
impl Execute for TokioExecutor {
/// Tokio's JoinHandle has it's own `abort` support, so dropping handle
won't cancel the task.
- fn execute(&self, f: BoxedStaticFuture<()>) -> Result<()> {
+ fn execute(&self, f: BoxedStaticFuture<()>) {
let _handle = tokio::task::spawn(f);
- Ok(())
}
}
@@ -46,12 +45,10 @@ mod tests {
let finished = Arc::new(AtomicBool::new(false));
let finished_clone = finished.clone();
- let _task = executor
- .execute(async move {
- sleep(Duration::from_secs(1)).await;
- finished_clone.store(true, Ordering::Relaxed);
- })
- .unwrap();
+ let _task = executor.execute(async move {
+ sleep(Duration::from_secs(1)).await;
+ finished_clone.store(true, Ordering::Relaxed);
+ });
sleep(Duration::from_secs(2)).await;
// Task must has been finished even without await task.