This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new ebcdfb4ca refactor!: Refactor oio::Delete to make it's API simple 
(#6823)
ebcdfb4ca is described below

commit ebcdfb4cab6c51c13ada399e8e3107a1bd157099
Author: Xuanwo <[email protected]>
AuthorDate: Thu Nov 27 16:40:26 2025 +0800

    refactor!: Refactor oio::Delete to make it's API simple (#6823)
    
    * refactor!: Refactor oio::Delete to make it's API simple
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix tests
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * FIx batch delete
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix tests
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix build
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/blocking/delete.rs                  |  5 --
 core/src/layers/async_backtrace.rs           |  9 +--
 core/src/layers/await_tree.rs                |  8 +--
 core/src/layers/concurrent_limit.rs          |  8 +--
 core/src/layers/correctness_check.rs         | 14 ++---
 core/src/layers/error_context.rs             | 22 +++-----
 core/src/layers/fastrace.rs                  | 11 ++--
 core/src/layers/logging.rs                   | 27 +++------
 core/src/layers/observe/metrics.rs           |  7 ++-
 core/src/layers/retry.rs                     | 74 +++++++++++++++----------
 core/src/layers/tail_cut.rs                  |  8 +--
 core/src/layers/timeout.rs                   |  8 +--
 core/src/layers/tracing.rs                   |  8 +--
 core/src/raw/oio/delete/api.rs               | 53 +++++++++---------
 core/src/raw/oio/delete/batch_delete.rs      | 82 ++++++++++++++++++++--------
 core/src/raw/oio/delete/one_shot_delete.rs   | 34 ++----------
 core/src/services/azblob/backend.rs          |  5 +-
 core/src/services/cloudflare_kv/backend.rs   |  5 +-
 core/src/services/gcs/backend.rs             |  5 +-
 core/src/services/oss/backend.rs             |  5 +-
 core/src/services/s3/backend.rs              |  5 +-
 core/src/types/delete/deleter.rs             | 33 +----------
 core/src/types/delete/futures_delete_sink.rs | 33 +----------
 core/src/types/operator/operator.rs          |  4 +-
 integrations/object_store/src/service/mod.rs |  5 +-
 25 files changed, 222 insertions(+), 256 deletions(-)

diff --git a/core/src/blocking/delete.rs b/core/src/blocking/delete.rs
index 41d091d63..e8b65157f 100644
--- a/core/src/blocking/delete.rs
+++ b/core/src/blocking/delete.rs
@@ -62,11 +62,6 @@ impl Deleter {
         self.handle.block_on(self.inner.delete_try_iter(try_iter))
     }
 
-    /// Flush the deleter, returns the number of deleted paths.
-    pub fn flush(&mut self) -> Result<usize> {
-        self.handle.block_on(self.inner.flush())
-    }
-
     /// Close the deleter, this will flush the deleter and wait until all 
paths are deleted.
     pub fn close(&mut self) -> Result<()> {
         self.handle.block_on(self.inner.close())
diff --git a/core/src/layers/async_backtrace.rs 
b/core/src/layers/async_backtrace.rs
index e68c37817..7cbc1516b 100644
--- a/core/src/layers/async_backtrace.rs
+++ b/core/src/layers/async_backtrace.rs
@@ -162,12 +162,13 @@ impl<R: oio::List> oio::List for AsyncBacktraceWrapper<R> 
{
 }
 
 impl<R: oio::Delete> oio::Delete for AsyncBacktraceWrapper<R> {
-    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
-        self.inner.delete(path, args)
+    #[async_backtrace::framed]
+    async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+        self.inner.delete(path, args).await
     }
 
     #[async_backtrace::framed]
-    async fn flush(&mut self) -> Result<usize> {
-        self.inner.flush().await
+    async fn close(&mut self) -> Result<()> {
+        self.inner.close().await
     }
 }
diff --git a/core/src/layers/await_tree.rs b/core/src/layers/await_tree.rs
index 416d814ca..449721c72 100644
--- a/core/src/layers/await_tree.rs
+++ b/core/src/layers/await_tree.rs
@@ -188,13 +188,13 @@ impl<R: oio::List> oio::List for AwaitTreeWrapper<R> {
 }
 
 impl<R: oio::Delete> oio::Delete for AwaitTreeWrapper<R> {
-    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
-        self.inner.delete(path, args)
+    async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+        self.inner.delete(path, args).await
     }
 
-    async fn flush(&mut self) -> Result<usize> {
+    async fn close(&mut self) -> Result<()> {
         self.inner
-            .flush()
+            .close()
             .instrument_await(format!("opendal::{}", Operation::Delete))
             .await
     }
diff --git a/core/src/layers/concurrent_limit.rs 
b/core/src/layers/concurrent_limit.rs
index 0f42007ba..4263963e5 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -279,11 +279,11 @@ impl<R: oio::List> oio::List for 
ConcurrentLimitWrapper<R> {
 }
 
 impl<R: oio::Delete> oio::Delete for ConcurrentLimitWrapper<R> {
-    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
-        self.inner.delete(path, args)
+    async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+        self.inner.delete(path, args).await
     }
 
-    async fn flush(&mut self) -> Result<usize> {
-        self.inner.flush().await
+    async fn close(&mut self) -> Result<()> {
+        self.inner.close().await
     }
 }
diff --git a/core/src/layers/correctness_check.rs 
b/core/src/layers/correctness_check.rs
index 0b8c962ff..ca24fe11c 100644
--- a/core/src/layers/correctness_check.rs
+++ b/core/src/layers/correctness_check.rs
@@ -239,13 +239,13 @@ impl<T> CheckWrapper<T> {
 }
 
 impl<T: oio::Delete> oio::Delete for CheckWrapper<T> {
-    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+    async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
         self.check_delete(&args)?;
-        self.inner.delete(path, args)
+        self.inner.delete(path, args).await
     }
 
-    fn flush(&mut self) -> impl Future<Output = Result<usize>> + MaybeSend {
-        self.inner.flush()
+    fn close(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
+        self.inner.close()
     }
 }
 
@@ -317,12 +317,12 @@ mod tests {
     struct MockDeleter;
 
     impl oio::Delete for MockDeleter {
-        fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
+        async fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
             Ok(())
         }
 
-        async fn flush(&mut self) -> Result<usize> {
-            Ok(1)
+        async fn close(&mut self) -> Result<()> {
+            Ok(())
         }
     }
 
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index 305c62bce..1bd7a4dcd 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -285,8 +285,8 @@ impl<T: oio::List> oio::List for ErrorContextWrapper<T> {
 }
 
 impl<T: oio::Delete> oio::Delete for ErrorContextWrapper<T> {
-    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
-        self.inner.delete(path, args).map_err(|err| {
+    async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+        self.inner.delete(path, args).await.map_err(|err| {
             err.with_operation(Operation::Delete)
                 .with_context("service", self.scheme)
                 .with_context("path", path)
@@ -294,17 +294,11 @@ impl<T: oio::Delete> oio::Delete for 
ErrorContextWrapper<T> {
         })
     }
 
-    async fn flush(&mut self) -> Result<usize> {
-        self.inner
-            .flush()
-            .await
-            .inspect(|&n| {
-                self.processed += n as u64;
-            })
-            .map_err(|err| {
-                err.with_operation(Operation::Delete)
-                    .with_context("service", self.scheme)
-                    .with_context("deleted", self.processed.to_string())
-            })
+    async fn close(&mut self) -> Result<()> {
+        self.inner.close().await.map_err(|err| {
+            err.with_operation(Operation::Delete)
+                .with_context("service", self.scheme)
+                .with_context("deleted", self.processed.to_string())
+        })
     }
 }
diff --git a/core/src/layers/fastrace.rs b/core/src/layers/fastrace.rs
index 18dd50ca5..fb81d8cce 100644
--- a/core/src/layers/fastrace.rs
+++ b/core/src/layers/fastrace.rs
@@ -258,14 +258,13 @@ impl<R: oio::List> oio::List for FastraceWrapper<R> {
 }
 
 impl<R: oio::Delete> oio::Delete for FastraceWrapper<R> {
-    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
-        let _g = self.span.set_local_parent();
-        let _span = 
LocalSpan::enter_with_local_parent(Operation::Delete.into_static());
-        self.inner.delete(path, args)
+    #[trace(enter_on_poll = true)]
+    async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+        self.inner.delete(path, args).await
     }
 
     #[trace(enter_on_poll = true)]
-    async fn flush(&mut self) -> Result<usize> {
-        self.inner.flush().await
+    async fn close(&mut self) -> Result<()> {
+        self.inner.close().await
     }
 }
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 82c48f87f..c64fb21ef 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -753,7 +753,6 @@ pub struct LoggingDeleter<D, I: LoggingInterceptor> {
     info: Arc<AccessorInfo>,
     logger: I,
 
-    queued: usize,
     deleted: usize,
     inner: D,
 }
@@ -764,7 +763,6 @@ impl<D, I: LoggingInterceptor> LoggingDeleter<D, I> {
             info,
             logger,
 
-            queued: 0,
             deleted: 0,
             inner,
         }
@@ -772,17 +770,17 @@ impl<D, I: LoggingInterceptor> LoggingDeleter<D, I> {
 }
 
 impl<D: oio::Delete, I: LoggingInterceptor> oio::Delete for LoggingDeleter<D, 
I> {
-    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+    async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
         let version = args
             .version()
             .map(|v| v.to_string())
             .unwrap_or_else(|| "<latest>".to_string());
 
-        let res = self.inner.delete(path, args);
+        let res = self.inner.delete(path, args).await;
 
         match &res {
             Ok(_) => {
-                self.queued += 1;
+                self.deleted += 1;
             }
             Err(err) => {
                 self.logger.log(
@@ -791,7 +789,6 @@ impl<D: oio::Delete, I: LoggingInterceptor> oio::Delete for 
LoggingDeleter<D, I>
                     &[
                         ("path", path),
                         ("version", &version),
-                        ("queued", &self.queued.to_string()),
                         ("deleted", &self.deleted.to_string()),
                     ],
                     "failed",
@@ -803,20 +800,15 @@ impl<D: oio::Delete, I: LoggingInterceptor> oio::Delete 
for LoggingDeleter<D, I>
         res
     }
 
-    async fn flush(&mut self) -> Result<usize> {
-        let res = self.inner.flush().await;
+    async fn close(&mut self) -> Result<()> {
+        let res = self.inner.close().await;
 
         match &res {
-            Ok(flushed) => {
-                self.queued -= flushed;
-                self.deleted += flushed;
+            Ok(_) => {
                 self.logger.log(
                     &self.info,
                     Operation::Delete,
-                    &[
-                        ("queued", &self.queued.to_string()),
-                        ("deleted", &self.deleted.to_string()),
-                    ],
+                    &[("deleted", &self.deleted.to_string())],
                     "succeeded",
                     None,
                 );
@@ -825,10 +817,7 @@ impl<D: oio::Delete, I: LoggingInterceptor> oio::Delete 
for LoggingDeleter<D, I>
                 self.logger.log(
                     &self.info,
                     Operation::Delete,
-                    &[
-                        ("queued", &self.queued.to_string()),
-                        ("deleted", &self.deleted.to_string()),
-                    ],
+                    &[("deleted", &self.deleted.to_string())],
                     "failed",
                     Some(err),
                 );
diff --git a/core/src/layers/observe/metrics.rs 
b/core/src/layers/observe/metrics.rs
index ea779ece1..7441d82f9 100644
--- a/core/src/layers/observe/metrics.rs
+++ b/core/src/layers/observe/metrics.rs
@@ -935,9 +935,10 @@ impl<R: oio::List, I: MetricsIntercept> oio::List for 
MetricsWrapper<R, I> {
 }
 
 impl<R: oio::Delete, I: MetricsIntercept> oio::Delete for MetricsWrapper<R, I> 
{
-    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+    async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
         self.inner
             .delete(path, args)
+            .await
             .inspect(|_| {
                 self.size += 1;
             })
@@ -949,8 +950,8 @@ impl<R: oio::Delete, I: MetricsIntercept> oio::Delete for 
MetricsWrapper<R, I> {
             })
     }
 
-    async fn flush(&mut self) -> Result<usize> {
-        self.inner.flush().await.inspect_err(|err| {
+    async fn close(&mut self) -> Result<()> {
+        self.inner.close().await.inspect_err(|err| {
             self.interceptor.observe(
                 self.labels.clone().with_error(err.kind()),
                 MetricValue::OperationErrorsTotal,
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 59a63830f..5ee852d15 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -20,7 +20,6 @@ use std::fmt::Formatter;
 use std::sync::Arc;
 use std::time::Duration;
 
-use backon::BlockingRetryable;
 use backon::ExponentialBuilder;
 use backon::Retryable;
 use log::warn;
@@ -562,25 +561,43 @@ impl<P: oio::List, I: RetryInterceptor> oio::List for 
RetryWrapper<P, I> {
 }
 
 impl<P: oio::Delete, I: RetryInterceptor> oio::Delete for RetryWrapper<P, I> {
-    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
-        { || self.inner.as_mut().unwrap().delete(path, args.clone()) }
-            .retry(self.builder)
-            .when(|e| e.is_temporary())
-            .notify(|err, dur| {
-                self.notify.intercept(err, dur);
-            })
-            .call()
-            .map_err(|e| e.set_persistent())
+    async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+        use backon::RetryableWithContext;
+
+        let inner = self.take_inner()?;
+        let path = path.to_string();
+        let args_cloned = args.clone();
+
+        let (inner, res) = {
+            |mut p: P| {
+                let path = path.clone();
+                let args = args_cloned.clone();
+                async move {
+                    let res = p.delete(&path, args).await;
+                    (p, res)
+                }
+            }
+        }
+        .retry(self.builder)
+        .when(|e| e.is_temporary())
+        .context(inner)
+        .notify(|err, dur| {
+            self.notify.intercept(err, dur);
+        })
+        .await;
+
+        self.inner = Some(inner);
+        res.map_err(|e| e.set_persistent())
     }
 
-    async fn flush(&mut self) -> Result<usize> {
+    async fn close(&mut self) -> Result<()> {
         use backon::RetryableWithContext;
 
         let inner = self.take_inner()?;
 
         let (inner, res) = {
             |mut p: P| async move {
-                let res = p.flush().await;
+                let res = p.close().await;
 
                 (p, res)
             }
@@ -598,7 +615,6 @@ impl<P: oio::Delete, I: RetryInterceptor> oio::Delete for 
RetryWrapper<P, I> {
 
 #[cfg(test)]
 mod tests {
-    use std::mem;
     use std::sync::Arc;
     use std::sync::Mutex;
 
@@ -790,12 +806,12 @@ mod tests {
     }
 
     impl oio::Delete for MockDeleter {
-        fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
+        async fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
             self.size += 1;
             Ok(())
         }
 
-        async fn flush(&mut self) -> Result<usize> {
+        async fn close(&mut self) -> Result<()> {
             let mut attempt = self.attempt.lock().unwrap();
             *attempt += 1;
 
@@ -804,21 +820,23 @@ mod tests {
                     Error::new(ErrorKind::Unexpected, "retryable_error from 
deleter")
                         .set_temporary(),
                 ),
-                2 => {
-                    self.size -= 1;
-                    Ok(1)
+                2..=4 => {
+                    self.size = self.size.saturating_sub(1);
+                    Err(
+                        Error::new(ErrorKind::Unexpected, "retryable_error 
from deleter")
+                            .set_temporary(),
+                    )
                 }
-                3 => Err(
-                    Error::new(ErrorKind::Unexpected, "retryable_error from 
deleter")
-                        .set_temporary(),
-                ),
-                4 => Err(
-                    Error::new(ErrorKind::Unexpected, "retryable_error from 
deleter")
-                        .set_temporary(),
-                ),
                 5 => {
-                    let s = mem::take(&mut self.size);
-                    Ok(s)
+                    self.size = self.size.saturating_sub(1);
+                    if self.size == 0 {
+                        Ok(())
+                    } else {
+                        Err(
+                            Error::new(ErrorKind::Unexpected, "retryable_error 
from deleter")
+                                .set_temporary(),
+                        )
+                    }
                 }
                 _ => unreachable!(),
             }
diff --git a/core/src/layers/tail_cut.rs b/core/src/layers/tail_cut.rs
index 42a071c60..ab89e5083 100644
--- a/core/src/layers/tail_cut.rs
+++ b/core/src/layers/tail_cut.rs
@@ -587,11 +587,11 @@ impl<R: oio::List> oio::List for TailCutWrapper<R> {
 }
 
 impl<R: oio::Delete> oio::Delete for TailCutWrapper<R> {
-    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
-        self.inner.delete(path, args)
+    async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+        self.inner.delete(path, args).await
     }
 
-    async fn flush(&mut self) -> Result<usize> {
+    async fn close(&mut self) -> Result<()> {
         let deadline = self.calculate_deadline(Operation::Delete);
         Self::with_io_deadline(
             deadline,
@@ -599,7 +599,7 @@ impl<R: oio::Delete> oio::Delete for TailCutWrapper<R> {
             &self.stats,
             self.size,
             Operation::Delete,
-            self.inner.flush(),
+            self.inner.close(),
         )
         .await
     }
diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs
index 5240e8f0c..b74bd3f42 100644
--- a/core/src/layers/timeout.rs
+++ b/core/src/layers/timeout.rs
@@ -341,12 +341,12 @@ impl<R: oio::List> oio::List for TimeoutWrapper<R> {
 }
 
 impl<R: oio::Delete> oio::Delete for TimeoutWrapper<R> {
-    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
-        self.inner.delete(path, args)
+    async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+        self.inner.delete(path, args).await
     }
 
-    async fn flush(&mut self) -> Result<usize> {
-        let fut = self.inner.flush();
+    async fn close(&mut self) -> Result<()> {
+        let fut = self.inner.close();
         Self::io_timeout(self.timeout, Operation::Delete.into_static(), 
fut).await
     }
 }
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index 00d88b5f7..e980477b4 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -335,15 +335,15 @@ impl<R: oio::List> oio::List for TracingWrapper<R> {
 }
 
 impl<R: oio::Delete> oio::Delete for TracingWrapper<R> {
-    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+    async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
         let _enter = self.span.enter();
 
-        self.inner.delete(path, args)
+        self.inner.delete(path, args).await
     }
 
-    async fn flush(&mut self) -> Result<usize> {
+    async fn close(&mut self) -> Result<()> {
         let _enter = self.span.enter();
 
-        self.inner.flush().await
+        self.inner.close().await
     }
 }
diff --git a/core/src/raw/oio/delete/api.rs b/core/src/raw/oio/delete/api.rs
index 95053444c..d21191e0b 100644
--- a/core/src/raw/oio/delete/api.rs
+++ b/core/src/raw/oio/delete/api.rs
@@ -39,35 +39,30 @@ pub trait Delete: Unpin + Send + Sync {
     /// - `Err(err)`: An error occurred and the deletion request was not queued
     ///
     /// # Notes
-    /// This method just queue the delete request. The actual deletion will be
-    /// performed when `flush` is called.
-    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()>;
+    /// This method just queues the delete request. The actual deletion will be
+    /// performed when `close` is called.
+    fn delete<'a>(
+        &'a mut self,
+        path: &'a str,
+        args: OpDelete,
+    ) -> impl Future<Output = Result<()>> + MaybeSend + 'a;
 
-    /// Flushes the deletion queue to ensure queued deletions are executed
-    ///
-    /// # Returns
-    /// - `Ok(0)`: All queued deletions have been processed or the queue is 
empty.
-    /// - `Ok(count)`: The number of resources successfully deleted. 
Implementations should
-    ///   return an error if the queue is non-empty but no resources were 
deleted
-    /// - `Err(err)`: An error occurred while performing the deletions
-    ///
-    /// # Notes
-    /// - This method is asynchronous and will wait for queued deletions to 
complete
-    fn flush(&mut self) -> impl Future<Output = Result<usize>> + MaybeSend;
+    /// Close the deleter and ensure all queued deletions are executed.
+    fn close(&mut self) -> impl Future<Output = Result<()>> + MaybeSend;
 }
 
 impl Delete for () {
-    fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
+    async fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "output deleter doesn't support delete",
         ))
     }
 
-    async fn flush(&mut self) -> Result<usize> {
+    async fn close(&mut self) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
-            "output deleter doesn't support flush",
+            "output deleter doesn't support close",
         ))
     }
 }
@@ -75,28 +70,32 @@ impl Delete for () {
 /// The dyn version of [`Delete`]
 pub trait DeleteDyn: Unpin + Send + Sync {
     /// The dyn version of [`Delete::delete`]
-    fn delete_dyn(&mut self, path: &str, args: OpDelete) -> Result<()>;
+    fn delete_dyn<'a>(&'a mut self, path: &'a str, args: OpDelete) -> 
BoxedFuture<'a, Result<()>>;
 
-    /// The dyn version of [`Delete::flush`]
-    fn flush_dyn(&mut self) -> BoxedFuture<'_, Result<usize>>;
+    /// The dyn version of [`Delete::close`]
+    fn close_dyn(&mut self) -> BoxedFuture<'_, Result<()>>;
 }
 
 impl<T: Delete + ?Sized> DeleteDyn for T {
-    fn delete_dyn(&mut self, path: &str, args: OpDelete) -> Result<()> {
-        Delete::delete(self, path, args)
+    fn delete_dyn<'a>(&'a mut self, path: &'a str, args: OpDelete) -> 
BoxedFuture<'a, Result<()>> {
+        Box::pin(Delete::delete(self, path, args))
     }
 
-    fn flush_dyn(&mut self) -> BoxedFuture<'_, Result<usize>> {
-        Box::pin(self.flush())
+    fn close_dyn(&mut self) -> BoxedFuture<'_, Result<()>> {
+        Box::pin(self.close())
     }
 }
 
 impl<T: DeleteDyn + ?Sized> Delete for Box<T> {
-    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+    fn delete<'a>(
+        &'a mut self,
+        path: &'a str,
+        args: OpDelete,
+    ) -> impl Future<Output = Result<()>> + MaybeSend + 'a {
         self.deref_mut().delete_dyn(path, args)
     }
 
-    async fn flush(&mut self) -> Result<usize> {
-        self.deref_mut().flush_dyn().await
+    async fn close(&mut self) -> Result<()> {
+        self.deref_mut().close_dyn().await
     }
 }
diff --git a/core/src/raw/oio/delete/batch_delete.rs 
b/core/src/raw/oio/delete/batch_delete.rs
index d244a22ad..63d262148 100644
--- a/core/src/raw/oio/delete/batch_delete.rs
+++ b/core/src/raw/oio/delete/batch_delete.rs
@@ -23,7 +23,7 @@ use crate::*;
 
 /// BatchDelete is used to implement [`oio::Delete`] based on batch delete 
operation.
 ///
-/// OneShotDeleter will perform delete operation while calling `flush`.
+/// OneShotDeleter will perform delete operation while calling `close`.
 pub trait BatchDelete: Send + Sync + Unpin + 'static {
     /// delete_once delete one path at once.
     ///
@@ -59,28 +59,30 @@ pub struct BatchDeleteResult {
 pub struct BatchDeleter<D: BatchDelete> {
     inner: D,
     buffer: HashSet<(String, OpDelete)>,
+    max_batch_size: usize,
 }
 
 impl<D: BatchDelete> BatchDeleter<D> {
     /// Create a new batch deleter.
-    pub fn new(inner: D) -> Self {
+    pub fn new(inner: D, max_batch_size: Option<usize>) -> Self {
+        debug_assert!(
+            max_batch_size.is_some(),
+            "BatchDeleter requires delete_max_size to be configured"
+        );
+        let max_batch_size = max_batch_size.unwrap_or(1);
+
         Self {
             inner,
             buffer: HashSet::default(),
+            max_batch_size,
         }
     }
-}
-
-impl<D: BatchDelete> oio::Delete for BatchDeleter<D> {
-    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
-        self.buffer.insert((path.to_string(), args));
-        Ok(())
-    }
 
-    async fn flush(&mut self) -> Result<usize> {
+    async fn flush_buffer(&mut self) -> Result<usize> {
         if self.buffer.is_empty() {
             return Ok(0);
         }
+
         if self.buffer.len() == 1 {
             let (path, args) = self
                 .buffer
@@ -95,23 +97,26 @@ impl<D: BatchDelete> oio::Delete for BatchDeleter<D> {
 
         let batch = self.buffer.iter().cloned().collect();
         let result = self.inner.delete_batch(batch).await?;
-        debug_assert!(
-            !result.succeeded.is_empty(),
-            "the number of succeeded operations must be greater than 0"
-        );
-        debug_assert_eq!(
-            result.succeeded.len() + result.failed.len(),
-            self.buffer.len(),
-            "the number of succeeded and failed operations must be equal to 
the input batch size"
-        );
 
-        // Remove all succeeded operations from the buffer.
-        let deleted = result.succeeded.len();
+        if result.succeeded.is_empty() {
+            return Err(Error::new(
+                ErrorKind::Unexpected,
+                "batch delete returned zero successes",
+            ));
+        }
+        if result.succeeded.len() + result.failed.len() != self.buffer.len() {
+            return Err(Error::new(
+                ErrorKind::Unexpected,
+                "batch delete result size mismatch",
+            ));
+        }
+
+        let mut deleted = 0;
         for i in result.succeeded {
             self.buffer.remove(&i);
+            deleted += 1;
         }
 
-        // Return directly if there are non-temporary errors.
         for (path, op, err) in result.failed {
             if !err.is_temporary() {
                 return Err(err
@@ -120,8 +125,37 @@ impl<D: BatchDelete> oio::Delete for BatchDeleter<D> {
             }
         }
 
-        // Return the number of succeeded operations to allow users to decide 
whether
-        // to retry or push more delete operations.
         Ok(deleted)
     }
 }
+
+impl<D: BatchDelete> oio::Delete for BatchDeleter<D> {
+    async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+        self.buffer.insert((path.to_string(), args));
+        if self.buffer.len() >= self.max_batch_size {
+            let _ = self.flush_buffer().await?;
+            return Ok(());
+        }
+
+        Ok(())
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        loop {
+            let deleted = self.flush_buffer().await?;
+
+            if self.buffer.is_empty() {
+                break;
+            }
+
+            if deleted == 0 {
+                return Err(Error::new(
+                    ErrorKind::Unexpected,
+                    "batch delete made no progress while buffer remains",
+                ));
+            }
+        }
+
+        Ok(())
+    }
+}
diff --git a/core/src/raw/oio/delete/one_shot_delete.rs 
b/core/src/raw/oio/delete/one_shot_delete.rs
index 1df00547a..711d5fba8 100644
--- a/core/src/raw/oio/delete/one_shot_delete.rs
+++ b/core/src/raw/oio/delete/one_shot_delete.rs
@@ -22,7 +22,7 @@ use crate::*;
 
 /// OneShotDelete is used to implement [`oio::Delete`] based on one shot 
operation.
 ///
-/// OneShotDeleter will perform delete operation while calling `flush`.
+/// OneShotDeleter will perform delete operation while calling `close`.
 pub trait OneShotDelete: Send + Sync + Unpin + 'static {
     /// delete_once delete one path at once.
     ///
@@ -37,43 +37,21 @@ pub trait OneShotDelete: Send + Sync + Unpin + 'static {
 /// OneShotDelete is used to implement [`oio::Delete`] based on one shot.
 pub struct OneShotDeleter<D> {
     inner: D,
-    delete: Option<(String, OpDelete)>,
 }
 
 impl<D> OneShotDeleter<D> {
     /// Create a new one shot deleter.
     pub fn new(inner: D) -> Self {
-        Self {
-            inner,
-            delete: None,
-        }
-    }
-
-    fn delete_inner(&mut self, path: String, args: OpDelete) -> Result<()> {
-        if self.delete.is_some() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "OneShotDeleter doesn't support batch delete",
-            ));
-        }
-
-        self.delete = Some((path, args));
-        Ok(())
+        Self { inner }
     }
 }
 
 impl<D: OneShotDelete> oio::Delete for OneShotDeleter<D> {
-    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
-        self.delete_inner(path.to_string(), args)
+    async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+        self.inner.delete_once(path.to_string(), args).await
     }
 
-    async fn flush(&mut self) -> Result<usize> {
-        let Some((path, args)) = self.delete.clone() else {
-            return Ok(0);
-        };
-
-        self.inner.delete_once(path, args).await?;
-        self.delete = None;
-        Ok(1)
+    async fn close(&mut self) -> Result<()> {
+        Ok(())
     }
 }
diff --git a/core/src/services/azblob/backend.rs 
b/core/src/services/azblob/backend.rs
index 02cf03630..0aedc029c 100644
--- a/core/src/services/azblob/backend.rs
+++ b/core/src/services/azblob/backend.rs
@@ -521,7 +521,10 @@ impl Access for AzblobBackend {
     async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
         Ok((
             RpDelete::default(),
-            oio::BatchDeleter::new(AzblobDeleter::new(self.core.clone())),
+            oio::BatchDeleter::new(
+                AzblobDeleter::new(self.core.clone()),
+                self.core.info.full_capability().delete_max_size,
+            ),
         ))
     }
 
diff --git a/core/src/services/cloudflare_kv/backend.rs 
b/core/src/services/cloudflare_kv/backend.rs
index a02515654..ec8a4ab13 100644
--- a/core/src/services/cloudflare_kv/backend.rs
+++ b/core/src/services/cloudflare_kv/backend.rs
@@ -480,7 +480,10 @@ impl Access for CloudflareKvBackend {
     async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
         Ok((
             RpDelete::default(),
-            
oio::BatchDeleter::new(CloudflareKvDeleter::new(self.core.clone())),
+            oio::BatchDeleter::new(
+                CloudflareKvDeleter::new(self.core.clone()),
+                self.core.info.full_capability().delete_max_size,
+            ),
         ))
     }
 
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index 9c556d032..c57cd9b00 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -429,7 +429,10 @@ impl Access for GcsBackend {
     async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
         Ok((
             RpDelete::default(),
-            oio::BatchDeleter::new(GcsDeleter::new(self.core.clone())),
+            oio::BatchDeleter::new(
+                GcsDeleter::new(self.core.clone()),
+                self.core.info.full_capability().delete_max_size,
+            ),
         ))
     }
 
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 2e5f9f33a..0e6a5aa01 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -656,7 +656,10 @@ impl Access for OssBackend {
     async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
         Ok((
             RpDelete::default(),
-            oio::BatchDeleter::new(OssDeleter::new(self.core.clone())),
+            oio::BatchDeleter::new(
+                OssDeleter::new(self.core.clone()),
+                self.core.info.full_capability().delete_max_size,
+            ),
         ))
     }
 
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index c5d42ba5f..7c0e72000 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -1059,7 +1059,10 @@ impl Access for S3Backend {
     async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
         Ok((
             RpDelete::default(),
-            oio::BatchDeleter::new(S3Deleter::new(self.core.clone())),
+            oio::BatchDeleter::new(
+                S3Deleter::new(self.core.clone()),
+                self.core.info.full_capability().delete_max_size,
+            ),
         ))
     }
 
diff --git a/core/src/types/delete/deleter.rs b/core/src/types/delete/deleter.rs
index 4675b9751..05621f0f7 100644
--- a/core/src/types/delete/deleter.rs
+++ b/core/src/types/delete/deleter.rs
@@ -83,38 +83,24 @@ use crate::*;
 /// ```
 pub struct Deleter {
     deleter: oio::Deleter,
-
-    max_size: usize,
-    cur_size: usize,
 }
 
 impl Deleter {
     pub(crate) async fn create(acc: Accessor) -> Result<Self> {
-        let max_size = 
acc.info().full_capability().delete_max_size.unwrap_or(1);
         let (_, deleter) = acc.delete().await?;
 
-        Ok(Self {
-            deleter,
-            max_size,
-            cur_size: 0,
-        })
+        Ok(Self { deleter })
     }
 
     /// Delete a path.
     pub async fn delete(&mut self, input: impl IntoDeleteInput) -> Result<()> {
-        if self.cur_size >= self.max_size {
-            let deleted = self.deleter.flush_dyn().await?;
-            self.cur_size -= deleted;
-        }
-
         let input = input.into_delete_input();
         let mut op = OpDelete::default();
         if let Some(version) = &input.version {
             op = op.with_version(version);
         }
 
-        self.deleter.delete_dyn(&input.path, op)?;
-        self.cur_size += 1;
+        self.deleter.delete_dyn(&input.path, op).await?;
         Ok(())
     }
 
@@ -195,22 +181,9 @@ impl Deleter {
         Ok(())
     }
 
-    /// Flush the deleter, returns the number of deleted paths.
-    pub async fn flush(&mut self) -> Result<usize> {
-        let deleted = self.deleter.flush_dyn().await?;
-        self.cur_size -= deleted;
-        Ok(deleted)
-    }
-
     /// Close the deleter, this will flush the deleter and wait until all 
paths are deleted.
     pub async fn close(&mut self) -> Result<()> {
-        loop {
-            self.flush().await?;
-            if self.cur_size == 0 {
-                break;
-            }
-        }
-        Ok(())
+        self.deleter.close_dyn().await
     }
 
     /// Convert the deleter into a sink.
diff --git a/core/src/types/delete/futures_delete_sink.rs 
b/core/src/types/delete/futures_delete_sink.rs
index 793b3e717..5600ebafc 100644
--- a/core/src/types/delete/futures_delete_sink.rs
+++ b/core/src/types/delete/futures_delete_sink.rs
@@ -35,7 +35,6 @@ pub struct FuturesDeleteSink<T: IntoDeleteInput> {
 enum State {
     Idle(Option<Deleter>),
     Delete(BoxedStaticFuture<(Deleter, Result<()>)>),
-    Flush(BoxedStaticFuture<(Deleter, Result<usize>)>),
     Close(BoxedStaticFuture<(Deleter, Result<()>)>),
 }
 
@@ -60,11 +59,6 @@ impl<T: IntoDeleteInput> Sink<T> for FuturesDeleteSink<T> {
                 self.state = State::Idle(Some(deleter));
                 Poll::Ready(res.map(|_| ()))
             }
-            State::Flush(fut) => {
-                let (deleter, res) = ready!(fut.as_mut().poll(cx));
-                self.state = State::Idle(Some(deleter));
-                Poll::Ready(res.map(|_| ()))
-            }
             State::Close(fut) => {
                 let (deleter, res) = ready!(fut.as_mut().poll(cx));
                 self.state = State::Idle(Some(deleter));
@@ -100,32 +94,13 @@ impl<T: IntoDeleteInput> Sink<T> for FuturesDeleteSink<T> {
     fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Result<()>> {
         loop {
             match &mut self.state {
-                State::Idle(deleter) => {
-                    let mut deleter = deleter.take().ok_or_else(|| {
-                        Error::new(
-                            ErrorKind::Unexpected,
-                            "FuturesDeleteSink has been closed or errored",
-                        )
-                    })?;
-                    let fut = async move {
-                        let res = deleter.flush().await;
-                        (deleter, res)
-                    };
-                    self.state = State::Flush(Box::pin(fut));
-                    return Poll::Ready(Ok(()));
-                }
+                State::Idle(_) => return Poll::Ready(Ok(())),
                 State::Delete(fut) => {
                     let (deleter, res) = ready!(fut.as_mut().poll(cx));
                     self.state = State::Idle(Some(deleter));
                     res?;
                     continue;
                 }
-                State::Flush(fut) => {
-                    let (deleter, res) = ready!(fut.as_mut().poll(cx));
-                    self.state = State::Idle(Some(deleter));
-                    let _ = res?;
-                    return Poll::Ready(Ok(()));
-                }
                 State::Close(fut) => {
                     let (deleter, res) = ready!(fut.as_mut().poll(cx));
                     self.state = State::Idle(Some(deleter));
@@ -159,12 +134,6 @@ impl<T: IntoDeleteInput> Sink<T> for FuturesDeleteSink<T> {
                     res?;
                     continue;
                 }
-                State::Flush(fut) => {
-                    let (deleter, res) = ready!(fut.as_mut().poll(cx));
-                    self.state = State::Idle(Some(deleter));
-                    res?;
-                    continue;
-                }
                 State::Close(fut) => {
                     let (deleter, res) = ready!(fut.as_mut().poll(cx));
                     self.state = State::Idle(Some(deleter));
diff --git a/core/src/types/operator/operator.rs 
b/core/src/types/operator/operator.rs
index 5f255ab52..82c722e76 100644
--- a/core/src/types/operator/operator.rs
+++ b/core/src/types/operator/operator.rs
@@ -1336,8 +1336,8 @@ impl Operator {
     async fn delete_inner(acc: Accessor, path: String, opts: 
options::DeleteOptions) -> Result<()> {
         let (_, mut deleter) = acc.delete_dyn().await?;
         let args = opts.into();
-        deleter.delete_dyn(&path, args)?;
-        deleter.flush_dyn().await?;
+        deleter.delete_dyn(&path, args).await?;
+        deleter.close_dyn().await?;
         Ok(())
     }
 
diff --git a/integrations/object_store/src/service/mod.rs 
b/integrations/object_store/src/service/mod.rs
index 825a2503a..f7bb8f7bf 100644
--- a/integrations/object_store/src/service/mod.rs
+++ b/integrations/object_store/src/service/mod.rs
@@ -143,7 +143,7 @@ impl Access for ObjectStoreService {
     }
 
     async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
-        let deleter = 
BatchDeleter::new(ObjectStoreDeleter::new(self.store.clone()));
+        let deleter = 
BatchDeleter::new(ObjectStoreDeleter::new(self.store.clone()), Some(1000));
         Ok((RpDelete::default(), deleter))
     }
 
@@ -355,8 +355,9 @@ mod tests {
         let (_, mut deleter) = backend.delete().await.expect("delete should 
succeed");
         deleter
             .delete(path, OpDelete::default())
+            .await
             .expect("delete should succeed");
-        deleter.flush().await.expect("flush should succeed");
+        deleter.close().await.expect("close should succeed");
 
         // Verify file is deleted
         let result = backend.stat(path, OpStat::default()).await;


Reply via email to