This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 979ceb74df feat(core): Implement TimeoutLayer for concurrent tasks
(#4688)
979ceb74df is described below
commit 979ceb74df6f63639c1a3f7449abd6ce487c3fc2
Author: Xuanwo <[email protected]>
AuthorDate: Wed Jun 5 19:10:20 2024 +0800
feat(core): Implement TimeoutLayer for concurrent tasks (#4688)
* feat(core): Implement TimeoutLayer for concurrent tasks
Signed-off-by: Xuanwo <[email protected]>
* Add some context
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
core/benches/types/concurrent_tasks.rs | 2 +-
core/src/layers/timeout.rs | 40 +++++++++++++++++++++++++--
core/src/raw/futures_util.rs | 4 +--
core/src/raw/oio/write/multipart_write.rs | 46 +++++++++++++++++++++++--------
core/src/types/execute/api.rs | 33 +++++++++++++++++++++-
core/src/types/execute/executor.rs | 13 +++++++--
core/src/types/read/buffer_stream.rs | 6 +++-
7 files changed, 124 insertions(+), 20 deletions(-)
diff --git a/core/benches/types/concurrent_tasks.rs
b/core/benches/types/concurrent_tasks.rs
index 0ee9ba4729..ec691d95f5 100644
--- a/core/benches/types/concurrent_tasks.rs
+++ b/core/benches/types/concurrent_tasks.rs
@@ -30,7 +30,7 @@ pub fn bench_concurrent_tasks(c: &mut Criterion) {
group.bench_with_input(concurrent.to_string(), &concurrent, |b,
concurrent| {
b.to_async(&*TOKIO).iter_batched(
|| {
- ConcurrentTasks::new(Some(Executor::new()), *concurrent,
|()| {
+ ConcurrentTasks::new(Executor::new(), *concurrent, |()| {
Box::pin(async {
black_box(());
((), Ok(()))
diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs
index ace1b644fb..29497d44fa 100644
--- a/core/src/layers/timeout.rs
+++ b/core/src/layers/timeout.rs
@@ -16,6 +16,7 @@
// under the License.
use std::future::Future;
+use std::sync::Arc;
use std::time::Duration;
use crate::raw::oio::ListOperation;
@@ -200,13 +201,27 @@ impl<A: Access> LayeredAccess for TimeoutAccessor<A> {
.await
}
- async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
+ async fn read(&self, path: &str, mut args: OpRead) -> Result<(RpRead,
Self::Reader)> {
+ if let Some(exec) = args.executor().cloned() {
+ args = args.with_executor(Executor::with(TimeoutExecutor::new(
+ exec.into_inner(),
+ self.io_timeout,
+ )));
+ }
+
self.io_timeout(Operation::Read, self.inner.read(path, args))
.await
.map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
}
- async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ async fn write(&self, path: &str, mut args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ if let Some(exec) = args.executor().cloned() {
+ args = args.with_executor(Executor::with(TimeoutExecutor::new(
+ exec.into_inner(),
+ self.io_timeout,
+ )));
+ }
+
self.io_timeout(Operation::Write, self.inner.write(path, args))
.await
.map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
@@ -260,6 +275,27 @@ impl<A: Access> LayeredAccess for TimeoutAccessor<A> {
}
}
+pub struct TimeoutExecutor {
+ exec: Arc<dyn Execute>,
+ timeout: Duration,
+}
+
+impl TimeoutExecutor {
+ pub fn new(exec: Arc<dyn Execute>, timeout: Duration) -> Self {
+ Self { exec, timeout }
+ }
+}
+
+impl Execute for TimeoutExecutor {
+ fn execute(&self, f: BoxedStaticFuture<()>) {
+ self.exec.execute(f)
+ }
+
+ fn timeout(&self) -> Option<BoxedStaticFuture<()>> {
+ Some(Box::pin(tokio::time::sleep(self.timeout)))
+ }
+}
+
pub struct TimeoutWrapper<R> {
inner: R,
diff --git a/core/src/raw/futures_util.rs b/core/src/raw/futures_util.rs
index 31ef65be77..8a909b4f5b 100644
--- a/core/src/raw/futures_util.rs
+++ b/core/src/raw/futures_util.rs
@@ -101,12 +101,12 @@ impl<I: Send + 'static, O: Send + 'static>
ConcurrentTasks<I, O> {
///
/// The factory is a function pointer that shouldn't capture any context.
pub fn new(
- executor: Option<Executor>,
+ executor: Executor,
concurrent: usize,
factory: fn(I) -> BoxedStaticFuture<(I, Result<O>)>,
) -> Self {
Self {
- executor: executor.unwrap_or_default(),
+ executor,
factory,
tasks: VecDeque::with_capacity(concurrent),
diff --git a/core/src/raw/oio/write/multipart_write.rs
b/core/src/raw/oio/write/multipart_write.rs
index 08b9598607..59e8d2725f 100644
--- a/core/src/raw/oio/write/multipart_write.rs
+++ b/core/src/raw/oio/write/multipart_write.rs
@@ -17,7 +17,8 @@
use std::sync::Arc;
-use futures::Future;
+use futures::FutureExt;
+use futures::{select, Future};
use crate::raw::*;
use crate::*;
@@ -116,6 +117,7 @@ pub struct MultipartPart {
struct WriteInput<W: MultipartWrite> {
w: Arc<W>,
+ executor: Executor,
upload_id: Arc<String>,
part_number: usize,
bytes: Buffer,
@@ -125,6 +127,7 @@ struct WriteInput<W: MultipartWrite> {
/// uploads.
pub struct MultipartWriter<W: MultipartWrite> {
w: Arc<W>,
+ executor: Executor,
upload_id: Option<Arc<String>>,
parts: Vec<MultipartPart>,
@@ -142,8 +145,10 @@ impl<W: MultipartWrite> MultipartWriter<W> {
/// Create a new MultipartWriter.
pub fn new(inner: W, executor: Option<Executor>, concurrent: usize) ->
Self {
let w = Arc::new(inner);
+ let executor = executor.unwrap_or_default();
Self {
w,
+ executor: executor.clone(),
upload_id: None,
parts: Vec::new(),
cache: None,
@@ -152,16 +157,33 @@ impl<W: MultipartWrite> MultipartWriter<W> {
tasks: ConcurrentTasks::new(executor, concurrent, |input| {
Box::pin({
async move {
- let result = input
- .w
- .write_part(
- &input.upload_id,
- input.part_number,
- input.bytes.len() as u64,
- input.bytes.clone(),
- )
- .await;
- (input, result)
+ let fut = input.w.write_part(
+ &input.upload_id,
+ input.part_number,
+ input.bytes.len() as u64,
+ input.bytes.clone(),
+ );
+ match input.executor.timeout() {
+ None => {
+ let result = fut.await;
+ (input, result)
+ }
+ Some(timeout) => {
+ let result = select! {
+ result = fut.fuse() => {
+ result
+ }
+ _ = timeout.fuse() => {
+ Err(Error::new(
+ ErrorKind::Unexpected, "write part
timeout")
+ .with_context("upload_id",
input.upload_id.to_string())
+ .with_context("part_number",
input.part_number.to_string())
+ .set_temporary())
+ }
+ };
+ (input, result)
+ }
+ }
}
})
}),
@@ -203,6 +225,7 @@ where
self.tasks
.execute(WriteInput {
w: self.w.clone(),
+ executor: self.executor.clone(),
upload_id: upload_id.clone(),
part_number,
bytes,
@@ -235,6 +258,7 @@ where
self.tasks
.execute(WriteInput {
w: self.w.clone(),
+ executor: self.executor.clone(),
upload_id: upload_id.clone(),
part_number,
bytes: cache,
diff --git a/core/src/types/execute/api.rs b/core/src/types/execute/api.rs
index 95844c7f30..52798b4bdf 100644
--- a/core/src/types/execute/api.rs
+++ b/core/src/types/execute/api.rs
@@ -23,13 +23,44 @@ use std::pin::Pin;
use std::task::{Context, Poll};
/// Execute trait is used to execute task in background.
+///
+/// # Notes about Timeout Implementation
+///
+/// Implementing a correct and elegant timeout mechanism is challenging for us.
+///
+/// The `Execute` trait must be object safe, allowing us to use `Arc<dyn
Execute>`. Consequently,
+/// we cannot introduce a generic type parameter to `Execute`. We utilize
[`RemoteHandle`] to
+/// implement the [`Execute::execute`] method. [`RemoteHandle`] operates by
transmitting
+/// `Future::Output` through a channel, enabling the spawning of
[`BoxedStaticFuture<()>`].
+///
+/// However, for timeouts, we need to spawn a future that resolves after a
specified duration.
+/// Simply wrapping the future within another timeout future is not feasible
because if the timeout
+/// is reached and the original future has not completed, it will be
dropped—causing any held `Task`
+/// to panic.
+///
+/// As an alternative solution, we developed a `timeout` API. Users of the
`Executor` should invoke
+/// this API when they require a timeout and combine it with their own futures
using
+/// [`futures::select`].
+///
+/// This approach may seem inelegant but it allows us flexibility without
being tied specifically
+/// to the Tokio runtime.
+///
+/// PLEASE raising an issue if you have a better solution.
pub trait Execute: Send + Sync + 'static {
/// Execute async task in background.
///
/// # Behavior
///
- /// - Implementor must manage the executing futures and keep making
progress.
+ /// - Implementor MUST manage the executing futures and keep making
progress.
+ /// - Implementor MUST NOT drop futures until it's resolved.
fn execute(&self, f: BoxedStaticFuture<()>);
+
+ /// Return a future that will be resolved after the given timeout.
+ ///
+ /// Default implementation returns None.
+ fn timeout(&self) -> Option<BoxedStaticFuture<()>> {
+ None
+ }
}
impl Execute for () {
diff --git a/core/src/types/execute/executor.rs
b/core/src/types/execute/executor.rs
index 47e0e5dbd9..b5f8321d9f 100644
--- a/core/src/types/execute/executor.rs
+++ b/core/src/types/execute/executor.rs
@@ -16,7 +16,7 @@
// under the License.
use super::*;
-use crate::raw::MaybeSend;
+use crate::raw::{BoxedStaticFuture, MaybeSend};
use futures::FutureExt;
use std::fmt::{Debug, Formatter};
use std::future::Future;
@@ -69,8 +69,17 @@ impl Executor {
}
}
+ /// Return the inner executor.
+ pub(crate) fn into_inner(self) -> Arc<dyn Execute> {
+ self.executor
+ }
+
+ /// Return a future that will be resolved after the given timeout.
+ pub(crate) fn timeout(&self) -> Option<BoxedStaticFuture<()>> {
+ self.executor.timeout()
+ }
+
/// Run given future in background immediately.
- #[allow(unused)]
pub(crate) fn execute<F>(&self, f: F) -> Task<F::Output>
where
F: Future + MaybeSend + 'static,
diff --git a/core/src/types/read/buffer_stream.rs
b/core/src/types/read/buffer_stream.rs
index 63494fb3bd..669165ec70 100644
--- a/core/src/types/read/buffer_stream.rs
+++ b/core/src/types/read/buffer_stream.rs
@@ -81,9 +81,13 @@ pub struct ChunkedReader {
impl ChunkedReader {
/// Create a new chunked reader.
+ ///
+ /// # Notes
+ ///
+ /// We don't need to handle `Executor::timeout` since we are outside of
the layer.
fn new(ctx: Arc<ReadContext>, range: Range<u64>) -> Self {
let tasks = ConcurrentTasks::new(
- ctx.args().executor().cloned(),
+ ctx.args().executor().cloned().unwrap_or_default(),
ctx.options().concurrent(),
|mut r: oio::Reader| {
Box::pin(async {