This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 979ceb74df feat(core): Implement TimeoutLayer for concurrent tasks 
(#4688)
979ceb74df is described below

commit 979ceb74df6f63639c1a3f7449abd6ce487c3fc2
Author: Xuanwo <[email protected]>
AuthorDate: Wed Jun 5 19:10:20 2024 +0800

    feat(core): Implement TimeoutLayer for concurrent tasks (#4688)
    
    * feat(core): Implement TimeoutLayer for concurrent tasks
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Add some context
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/benches/types/concurrent_tasks.rs    |  2 +-
 core/src/layers/timeout.rs                | 40 +++++++++++++++++++++++++--
 core/src/raw/futures_util.rs              |  4 +--
 core/src/raw/oio/write/multipart_write.rs | 46 +++++++++++++++++++++++--------
 core/src/types/execute/api.rs             | 33 +++++++++++++++++++++-
 core/src/types/execute/executor.rs        | 13 +++++++--
 core/src/types/read/buffer_stream.rs      |  6 +++-
 7 files changed, 124 insertions(+), 20 deletions(-)

diff --git a/core/benches/types/concurrent_tasks.rs 
b/core/benches/types/concurrent_tasks.rs
index 0ee9ba4729..ec691d95f5 100644
--- a/core/benches/types/concurrent_tasks.rs
+++ b/core/benches/types/concurrent_tasks.rs
@@ -30,7 +30,7 @@ pub fn bench_concurrent_tasks(c: &mut Criterion) {
         group.bench_with_input(concurrent.to_string(), &concurrent, |b, 
concurrent| {
             b.to_async(&*TOKIO).iter_batched(
                 || {
-                    ConcurrentTasks::new(Some(Executor::new()), *concurrent, 
|()| {
+                    ConcurrentTasks::new(Executor::new(), *concurrent, |()| {
                         Box::pin(async {
                             black_box(());
                             ((), Ok(()))
diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs
index ace1b644fb..29497d44fa 100644
--- a/core/src/layers/timeout.rs
+++ b/core/src/layers/timeout.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use std::future::Future;
+use std::sync::Arc;
 use std::time::Duration;
 
 use crate::raw::oio::ListOperation;
@@ -200,13 +201,27 @@ impl<A: Access> LayeredAccess for TimeoutAccessor<A> {
             .await
     }
 
-    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+    async fn read(&self, path: &str, mut args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        if let Some(exec) = args.executor().cloned() {
+            args = args.with_executor(Executor::with(TimeoutExecutor::new(
+                exec.into_inner(),
+                self.io_timeout,
+            )));
+        }
+
         self.io_timeout(Operation::Read, self.inner.read(path, args))
             .await
             .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
     }
 
-    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+    async fn write(&self, path: &str, mut args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        if let Some(exec) = args.executor().cloned() {
+            args = args.with_executor(Executor::with(TimeoutExecutor::new(
+                exec.into_inner(),
+                self.io_timeout,
+            )));
+        }
+
         self.io_timeout(Operation::Write, self.inner.write(path, args))
             .await
             .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
@@ -260,6 +275,27 @@ impl<A: Access> LayeredAccess for TimeoutAccessor<A> {
     }
 }
 
+pub struct TimeoutExecutor {
+    exec: Arc<dyn Execute>,
+    timeout: Duration,
+}
+
+impl TimeoutExecutor {
+    pub fn new(exec: Arc<dyn Execute>, timeout: Duration) -> Self {
+        Self { exec, timeout }
+    }
+}
+
+impl Execute for TimeoutExecutor {
+    fn execute(&self, f: BoxedStaticFuture<()>) {
+        self.exec.execute(f)
+    }
+
+    fn timeout(&self) -> Option<BoxedStaticFuture<()>> {
+        Some(Box::pin(tokio::time::sleep(self.timeout)))
+    }
+}
+
 pub struct TimeoutWrapper<R> {
     inner: R,
 
diff --git a/core/src/raw/futures_util.rs b/core/src/raw/futures_util.rs
index 31ef65be77..8a909b4f5b 100644
--- a/core/src/raw/futures_util.rs
+++ b/core/src/raw/futures_util.rs
@@ -101,12 +101,12 @@ impl<I: Send + 'static, O: Send + 'static> 
ConcurrentTasks<I, O> {
     ///
     /// The factory is a function pointer that shouldn't capture any context.
     pub fn new(
-        executor: Option<Executor>,
+        executor: Executor,
         concurrent: usize,
         factory: fn(I) -> BoxedStaticFuture<(I, Result<O>)>,
     ) -> Self {
         Self {
-            executor: executor.unwrap_or_default(),
+            executor,
             factory,
 
             tasks: VecDeque::with_capacity(concurrent),
diff --git a/core/src/raw/oio/write/multipart_write.rs 
b/core/src/raw/oio/write/multipart_write.rs
index 08b9598607..59e8d2725f 100644
--- a/core/src/raw/oio/write/multipart_write.rs
+++ b/core/src/raw/oio/write/multipart_write.rs
@@ -17,7 +17,8 @@
 
 use std::sync::Arc;
 
-use futures::Future;
+use futures::FutureExt;
+use futures::{select, Future};
 
 use crate::raw::*;
 use crate::*;
@@ -116,6 +117,7 @@ pub struct MultipartPart {
 
 struct WriteInput<W: MultipartWrite> {
     w: Arc<W>,
+    executor: Executor,
     upload_id: Arc<String>,
     part_number: usize,
     bytes: Buffer,
@@ -125,6 +127,7 @@ struct WriteInput<W: MultipartWrite> {
 /// uploads.
 pub struct MultipartWriter<W: MultipartWrite> {
     w: Arc<W>,
+    executor: Executor,
 
     upload_id: Option<Arc<String>>,
     parts: Vec<MultipartPart>,
@@ -142,8 +145,10 @@ impl<W: MultipartWrite> MultipartWriter<W> {
     /// Create a new MultipartWriter.
     pub fn new(inner: W, executor: Option<Executor>, concurrent: usize) -> 
Self {
         let w = Arc::new(inner);
+        let executor = executor.unwrap_or_default();
         Self {
             w,
+            executor: executor.clone(),
             upload_id: None,
             parts: Vec::new(),
             cache: None,
@@ -152,16 +157,33 @@ impl<W: MultipartWrite> MultipartWriter<W> {
             tasks: ConcurrentTasks::new(executor, concurrent, |input| {
                 Box::pin({
                     async move {
-                        let result = input
-                            .w
-                            .write_part(
-                                &input.upload_id,
-                                input.part_number,
-                                input.bytes.len() as u64,
-                                input.bytes.clone(),
-                            )
-                            .await;
-                        (input, result)
+                        let fut = input.w.write_part(
+                            &input.upload_id,
+                            input.part_number,
+                            input.bytes.len() as u64,
+                            input.bytes.clone(),
+                        );
+                        match input.executor.timeout() {
+                            None => {
+                                let result = fut.await;
+                                (input, result)
+                            }
+                            Some(timeout) => {
+                                let result = select! {
+                                    result = fut.fuse() => {
+                                        result
+                                    }
+                                    _ = timeout.fuse() => {
+                                        Err(Error::new(
+                                            ErrorKind::Unexpected, "write part 
timeout")
+                                                .with_context("upload_id", 
input.upload_id.to_string())
+                                                .with_context("part_number", 
input.part_number.to_string())
+                                                .set_temporary())
+                                    }
+                                };
+                                (input, result)
+                            }
+                        }
                     }
                 })
             }),
@@ -203,6 +225,7 @@ where
         self.tasks
             .execute(WriteInput {
                 w: self.w.clone(),
+                executor: self.executor.clone(),
                 upload_id: upload_id.clone(),
                 part_number,
                 bytes,
@@ -235,6 +258,7 @@ where
             self.tasks
                 .execute(WriteInput {
                     w: self.w.clone(),
+                    executor: self.executor.clone(),
                     upload_id: upload_id.clone(),
                     part_number,
                     bytes: cache,
diff --git a/core/src/types/execute/api.rs b/core/src/types/execute/api.rs
index 95844c7f30..52798b4bdf 100644
--- a/core/src/types/execute/api.rs
+++ b/core/src/types/execute/api.rs
@@ -23,13 +23,44 @@ use std::pin::Pin;
 use std::task::{Context, Poll};
 
 /// Execute trait is used to execute task in background.
+///
+/// # Notes about Timeout Implementation
+///
+/// Implementing a correct and elegant timeout mechanism is challenging for us.
+///
+/// The `Execute` trait must be object safe, allowing us to use `Arc<dyn 
Execute>`. Consequently,
+/// we cannot introduce a generic type parameter to `Execute`. We utilize 
[`RemoteHandle`] to
+/// implement the [`Execute::execute`] method. [`RemoteHandle`] operates by 
transmitting
+/// `Future::Output` through a channel, enabling the spawning of 
[`BoxedStaticFuture<()>`].
+///
+/// However, for timeouts, we need to spawn a future that resolves after a 
specified duration.
+/// Simply wrapping the future within another timeout future is not feasible 
because if the timeout
+/// is reached and the original future has not completed, it will be 
dropped—causing any held `Task`
+/// to panic.
+///
+/// As an alternative solution, we developed a `timeout` API. Users of the 
`Executor` should invoke
+/// this API when they require a timeout and combine it with their own futures 
using
+/// [`futures::select`].
+///
+/// This approach may seem inelegant but it allows us flexibility without 
being tied specifically
+/// to the Tokio runtime.
+///
+/// PLEASE raising an issue if you have a better solution.
 pub trait Execute: Send + Sync + 'static {
     /// Execute async task in background.
     ///
     /// # Behavior
     ///
-    /// - Implementor must manage the executing futures and keep making 
progress.
+    /// - Implementor MUST manage the executing futures and keep making 
progress.
+    /// - Implementor MUST NOT drop futures until it's resolved.
     fn execute(&self, f: BoxedStaticFuture<()>);
+
+    /// Return a future that will be resolved after the given timeout.
+    ///
+    /// Default implementation returns None.
+    fn timeout(&self) -> Option<BoxedStaticFuture<()>> {
+        None
+    }
 }
 
 impl Execute for () {
diff --git a/core/src/types/execute/executor.rs 
b/core/src/types/execute/executor.rs
index 47e0e5dbd9..b5f8321d9f 100644
--- a/core/src/types/execute/executor.rs
+++ b/core/src/types/execute/executor.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use super::*;
-use crate::raw::MaybeSend;
+use crate::raw::{BoxedStaticFuture, MaybeSend};
 use futures::FutureExt;
 use std::fmt::{Debug, Formatter};
 use std::future::Future;
@@ -69,8 +69,17 @@ impl Executor {
         }
     }
 
+    /// Return the inner executor.
+    pub(crate) fn into_inner(self) -> Arc<dyn Execute> {
+        self.executor
+    }
+
+    /// Return a future that will be resolved after the given timeout.
+    pub(crate) fn timeout(&self) -> Option<BoxedStaticFuture<()>> {
+        self.executor.timeout()
+    }
+
     /// Run given future in background immediately.
-    #[allow(unused)]
     pub(crate) fn execute<F>(&self, f: F) -> Task<F::Output>
     where
         F: Future + MaybeSend + 'static,
diff --git a/core/src/types/read/buffer_stream.rs 
b/core/src/types/read/buffer_stream.rs
index 63494fb3bd..669165ec70 100644
--- a/core/src/types/read/buffer_stream.rs
+++ b/core/src/types/read/buffer_stream.rs
@@ -81,9 +81,13 @@ pub struct ChunkedReader {
 
 impl ChunkedReader {
     /// Create a new chunked reader.
+    ///
+    /// # Notes
+    ///
+    /// We don't need to handle `Executor::timeout` since we are outside of 
the layer.
     fn new(ctx: Arc<ReadContext>, range: Range<u64>) -> Self {
         let tasks = ConcurrentTasks::new(
-            ctx.args().executor().cloned(),
+            ctx.args().executor().cloned().unwrap_or_default(),
             ctx.options().concurrent(),
             |mut r: oio::Reader| {
                 Box::pin(async {

Reply via email to