This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new ebcdfb4ca refactor!: Refactor oio::Delete to make it's API simple
(#6823)
ebcdfb4ca is described below
commit ebcdfb4cab6c51c13ada399e8e3107a1bd157099
Author: Xuanwo <[email protected]>
AuthorDate: Thu Nov 27 16:40:26 2025 +0800
refactor!: Refactor oio::Delete to make it's API simple (#6823)
* refactor!: Refactor oio::Delete to make it's API simple
Signed-off-by: Xuanwo <[email protected]>
* Fix tests
Signed-off-by: Xuanwo <[email protected]>
* FIx batch delete
Signed-off-by: Xuanwo <[email protected]>
* Fix tests
Signed-off-by: Xuanwo <[email protected]>
* Fix build
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
core/src/blocking/delete.rs | 5 --
core/src/layers/async_backtrace.rs | 9 +--
core/src/layers/await_tree.rs | 8 +--
core/src/layers/concurrent_limit.rs | 8 +--
core/src/layers/correctness_check.rs | 14 ++---
core/src/layers/error_context.rs | 22 +++-----
core/src/layers/fastrace.rs | 11 ++--
core/src/layers/logging.rs | 27 +++------
core/src/layers/observe/metrics.rs | 7 ++-
core/src/layers/retry.rs | 74 +++++++++++++++----------
core/src/layers/tail_cut.rs | 8 +--
core/src/layers/timeout.rs | 8 +--
core/src/layers/tracing.rs | 8 +--
core/src/raw/oio/delete/api.rs | 53 +++++++++---------
core/src/raw/oio/delete/batch_delete.rs | 82 ++++++++++++++++++++--------
core/src/raw/oio/delete/one_shot_delete.rs | 34 ++----------
core/src/services/azblob/backend.rs | 5 +-
core/src/services/cloudflare_kv/backend.rs | 5 +-
core/src/services/gcs/backend.rs | 5 +-
core/src/services/oss/backend.rs | 5 +-
core/src/services/s3/backend.rs | 5 +-
core/src/types/delete/deleter.rs | 33 +----------
core/src/types/delete/futures_delete_sink.rs | 33 +----------
core/src/types/operator/operator.rs | 4 +-
integrations/object_store/src/service/mod.rs | 5 +-
25 files changed, 222 insertions(+), 256 deletions(-)
diff --git a/core/src/blocking/delete.rs b/core/src/blocking/delete.rs
index 41d091d63..e8b65157f 100644
--- a/core/src/blocking/delete.rs
+++ b/core/src/blocking/delete.rs
@@ -62,11 +62,6 @@ impl Deleter {
self.handle.block_on(self.inner.delete_try_iter(try_iter))
}
- /// Flush the deleter, returns the number of deleted paths.
- pub fn flush(&mut self) -> Result<usize> {
- self.handle.block_on(self.inner.flush())
- }
-
/// Close the deleter, this will flush the deleter and wait until all
paths are deleted.
pub fn close(&mut self) -> Result<()> {
self.handle.block_on(self.inner.close())
diff --git a/core/src/layers/async_backtrace.rs
b/core/src/layers/async_backtrace.rs
index e68c37817..7cbc1516b 100644
--- a/core/src/layers/async_backtrace.rs
+++ b/core/src/layers/async_backtrace.rs
@@ -162,12 +162,13 @@ impl<R: oio::List> oio::List for AsyncBacktraceWrapper<R>
{
}
impl<R: oio::Delete> oio::Delete for AsyncBacktraceWrapper<R> {
- fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
- self.inner.delete(path, args)
+ #[async_backtrace::framed]
+ async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+ self.inner.delete(path, args).await
}
#[async_backtrace::framed]
- async fn flush(&mut self) -> Result<usize> {
- self.inner.flush().await
+ async fn close(&mut self) -> Result<()> {
+ self.inner.close().await
}
}
diff --git a/core/src/layers/await_tree.rs b/core/src/layers/await_tree.rs
index 416d814ca..449721c72 100644
--- a/core/src/layers/await_tree.rs
+++ b/core/src/layers/await_tree.rs
@@ -188,13 +188,13 @@ impl<R: oio::List> oio::List for AwaitTreeWrapper<R> {
}
impl<R: oio::Delete> oio::Delete for AwaitTreeWrapper<R> {
- fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
- self.inner.delete(path, args)
+ async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+ self.inner.delete(path, args).await
}
- async fn flush(&mut self) -> Result<usize> {
+ async fn close(&mut self) -> Result<()> {
self.inner
- .flush()
+ .close()
.instrument_await(format!("opendal::{}", Operation::Delete))
.await
}
diff --git a/core/src/layers/concurrent_limit.rs
b/core/src/layers/concurrent_limit.rs
index 0f42007ba..4263963e5 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -279,11 +279,11 @@ impl<R: oio::List> oio::List for
ConcurrentLimitWrapper<R> {
}
impl<R: oio::Delete> oio::Delete for ConcurrentLimitWrapper<R> {
- fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
- self.inner.delete(path, args)
+ async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+ self.inner.delete(path, args).await
}
- async fn flush(&mut self) -> Result<usize> {
- self.inner.flush().await
+ async fn close(&mut self) -> Result<()> {
+ self.inner.close().await
}
}
diff --git a/core/src/layers/correctness_check.rs
b/core/src/layers/correctness_check.rs
index 0b8c962ff..ca24fe11c 100644
--- a/core/src/layers/correctness_check.rs
+++ b/core/src/layers/correctness_check.rs
@@ -239,13 +239,13 @@ impl<T> CheckWrapper<T> {
}
impl<T: oio::Delete> oio::Delete for CheckWrapper<T> {
- fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+ async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
self.check_delete(&args)?;
- self.inner.delete(path, args)
+ self.inner.delete(path, args).await
}
- fn flush(&mut self) -> impl Future<Output = Result<usize>> + MaybeSend {
- self.inner.flush()
+ fn close(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
+ self.inner.close()
}
}
@@ -317,12 +317,12 @@ mod tests {
struct MockDeleter;
impl oio::Delete for MockDeleter {
- fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
+ async fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
Ok(())
}
- async fn flush(&mut self) -> Result<usize> {
- Ok(1)
+ async fn close(&mut self) -> Result<()> {
+ Ok(())
}
}
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index 305c62bce..1bd7a4dcd 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -285,8 +285,8 @@ impl<T: oio::List> oio::List for ErrorContextWrapper<T> {
}
impl<T: oio::Delete> oio::Delete for ErrorContextWrapper<T> {
- fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
- self.inner.delete(path, args).map_err(|err| {
+ async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+ self.inner.delete(path, args).await.map_err(|err| {
err.with_operation(Operation::Delete)
.with_context("service", self.scheme)
.with_context("path", path)
@@ -294,17 +294,11 @@ impl<T: oio::Delete> oio::Delete for
ErrorContextWrapper<T> {
})
}
- async fn flush(&mut self) -> Result<usize> {
- self.inner
- .flush()
- .await
- .inspect(|&n| {
- self.processed += n as u64;
- })
- .map_err(|err| {
- err.with_operation(Operation::Delete)
- .with_context("service", self.scheme)
- .with_context("deleted", self.processed.to_string())
- })
+ async fn close(&mut self) -> Result<()> {
+ self.inner.close().await.map_err(|err| {
+ err.with_operation(Operation::Delete)
+ .with_context("service", self.scheme)
+ .with_context("deleted", self.processed.to_string())
+ })
}
}
diff --git a/core/src/layers/fastrace.rs b/core/src/layers/fastrace.rs
index 18dd50ca5..fb81d8cce 100644
--- a/core/src/layers/fastrace.rs
+++ b/core/src/layers/fastrace.rs
@@ -258,14 +258,13 @@ impl<R: oio::List> oio::List for FastraceWrapper<R> {
}
impl<R: oio::Delete> oio::Delete for FastraceWrapper<R> {
- fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
- let _g = self.span.set_local_parent();
- let _span =
LocalSpan::enter_with_local_parent(Operation::Delete.into_static());
- self.inner.delete(path, args)
+ #[trace(enter_on_poll = true)]
+ async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+ self.inner.delete(path, args).await
}
#[trace(enter_on_poll = true)]
- async fn flush(&mut self) -> Result<usize> {
- self.inner.flush().await
+ async fn close(&mut self) -> Result<()> {
+ self.inner.close().await
}
}
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 82c48f87f..c64fb21ef 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -753,7 +753,6 @@ pub struct LoggingDeleter<D, I: LoggingInterceptor> {
info: Arc<AccessorInfo>,
logger: I,
- queued: usize,
deleted: usize,
inner: D,
}
@@ -764,7 +763,6 @@ impl<D, I: LoggingInterceptor> LoggingDeleter<D, I> {
info,
logger,
- queued: 0,
deleted: 0,
inner,
}
@@ -772,17 +770,17 @@ impl<D, I: LoggingInterceptor> LoggingDeleter<D, I> {
}
impl<D: oio::Delete, I: LoggingInterceptor> oio::Delete for LoggingDeleter<D,
I> {
- fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+ async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
let version = args
.version()
.map(|v| v.to_string())
.unwrap_or_else(|| "<latest>".to_string());
- let res = self.inner.delete(path, args);
+ let res = self.inner.delete(path, args).await;
match &res {
Ok(_) => {
- self.queued += 1;
+ self.deleted += 1;
}
Err(err) => {
self.logger.log(
@@ -791,7 +789,6 @@ impl<D: oio::Delete, I: LoggingInterceptor> oio::Delete for
LoggingDeleter<D, I>
&[
("path", path),
("version", &version),
- ("queued", &self.queued.to_string()),
("deleted", &self.deleted.to_string()),
],
"failed",
@@ -803,20 +800,15 @@ impl<D: oio::Delete, I: LoggingInterceptor> oio::Delete
for LoggingDeleter<D, I>
res
}
- async fn flush(&mut self) -> Result<usize> {
- let res = self.inner.flush().await;
+ async fn close(&mut self) -> Result<()> {
+ let res = self.inner.close().await;
match &res {
- Ok(flushed) => {
- self.queued -= flushed;
- self.deleted += flushed;
+ Ok(_) => {
self.logger.log(
&self.info,
Operation::Delete,
- &[
- ("queued", &self.queued.to_string()),
- ("deleted", &self.deleted.to_string()),
- ],
+ &[("deleted", &self.deleted.to_string())],
"succeeded",
None,
);
@@ -825,10 +817,7 @@ impl<D: oio::Delete, I: LoggingInterceptor> oio::Delete
for LoggingDeleter<D, I>
self.logger.log(
&self.info,
Operation::Delete,
- &[
- ("queued", &self.queued.to_string()),
- ("deleted", &self.deleted.to_string()),
- ],
+ &[("deleted", &self.deleted.to_string())],
"failed",
Some(err),
);
diff --git a/core/src/layers/observe/metrics.rs
b/core/src/layers/observe/metrics.rs
index ea779ece1..7441d82f9 100644
--- a/core/src/layers/observe/metrics.rs
+++ b/core/src/layers/observe/metrics.rs
@@ -935,9 +935,10 @@ impl<R: oio::List, I: MetricsIntercept> oio::List for
MetricsWrapper<R, I> {
}
impl<R: oio::Delete, I: MetricsIntercept> oio::Delete for MetricsWrapper<R, I>
{
- fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+ async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
self.inner
.delete(path, args)
+ .await
.inspect(|_| {
self.size += 1;
})
@@ -949,8 +950,8 @@ impl<R: oio::Delete, I: MetricsIntercept> oio::Delete for
MetricsWrapper<R, I> {
})
}
- async fn flush(&mut self) -> Result<usize> {
- self.inner.flush().await.inspect_err(|err| {
+ async fn close(&mut self) -> Result<()> {
+ self.inner.close().await.inspect_err(|err| {
self.interceptor.observe(
self.labels.clone().with_error(err.kind()),
MetricValue::OperationErrorsTotal,
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 59a63830f..5ee852d15 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -20,7 +20,6 @@ use std::fmt::Formatter;
use std::sync::Arc;
use std::time::Duration;
-use backon::BlockingRetryable;
use backon::ExponentialBuilder;
use backon::Retryable;
use log::warn;
@@ -562,25 +561,43 @@ impl<P: oio::List, I: RetryInterceptor> oio::List for
RetryWrapper<P, I> {
}
impl<P: oio::Delete, I: RetryInterceptor> oio::Delete for RetryWrapper<P, I> {
- fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
- { || self.inner.as_mut().unwrap().delete(path, args.clone()) }
- .retry(self.builder)
- .when(|e| e.is_temporary())
- .notify(|err, dur| {
- self.notify.intercept(err, dur);
- })
- .call()
- .map_err(|e| e.set_persistent())
+ async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+ use backon::RetryableWithContext;
+
+ let inner = self.take_inner()?;
+ let path = path.to_string();
+ let args_cloned = args.clone();
+
+ let (inner, res) = {
+ |mut p: P| {
+ let path = path.clone();
+ let args = args_cloned.clone();
+ async move {
+ let res = p.delete(&path, args).await;
+ (p, res)
+ }
+ }
+ }
+ .retry(self.builder)
+ .when(|e| e.is_temporary())
+ .context(inner)
+ .notify(|err, dur| {
+ self.notify.intercept(err, dur);
+ })
+ .await;
+
+ self.inner = Some(inner);
+ res.map_err(|e| e.set_persistent())
}
- async fn flush(&mut self) -> Result<usize> {
+ async fn close(&mut self) -> Result<()> {
use backon::RetryableWithContext;
let inner = self.take_inner()?;
let (inner, res) = {
|mut p: P| async move {
- let res = p.flush().await;
+ let res = p.close().await;
(p, res)
}
@@ -598,7 +615,6 @@ impl<P: oio::Delete, I: RetryInterceptor> oio::Delete for
RetryWrapper<P, I> {
#[cfg(test)]
mod tests {
- use std::mem;
use std::sync::Arc;
use std::sync::Mutex;
@@ -790,12 +806,12 @@ mod tests {
}
impl oio::Delete for MockDeleter {
- fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
+ async fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
self.size += 1;
Ok(())
}
- async fn flush(&mut self) -> Result<usize> {
+ async fn close(&mut self) -> Result<()> {
let mut attempt = self.attempt.lock().unwrap();
*attempt += 1;
@@ -804,21 +820,23 @@ mod tests {
Error::new(ErrorKind::Unexpected, "retryable_error from
deleter")
.set_temporary(),
),
- 2 => {
- self.size -= 1;
- Ok(1)
+ 2..=4 => {
+ self.size = self.size.saturating_sub(1);
+ Err(
+ Error::new(ErrorKind::Unexpected, "retryable_error
from deleter")
+ .set_temporary(),
+ )
}
- 3 => Err(
- Error::new(ErrorKind::Unexpected, "retryable_error from
deleter")
- .set_temporary(),
- ),
- 4 => Err(
- Error::new(ErrorKind::Unexpected, "retryable_error from
deleter")
- .set_temporary(),
- ),
5 => {
- let s = mem::take(&mut self.size);
- Ok(s)
+ self.size = self.size.saturating_sub(1);
+ if self.size == 0 {
+ Ok(())
+ } else {
+ Err(
+ Error::new(ErrorKind::Unexpected, "retryable_error
from deleter")
+ .set_temporary(),
+ )
+ }
}
_ => unreachable!(),
}
diff --git a/core/src/layers/tail_cut.rs b/core/src/layers/tail_cut.rs
index 42a071c60..ab89e5083 100644
--- a/core/src/layers/tail_cut.rs
+++ b/core/src/layers/tail_cut.rs
@@ -587,11 +587,11 @@ impl<R: oio::List> oio::List for TailCutWrapper<R> {
}
impl<R: oio::Delete> oio::Delete for TailCutWrapper<R> {
- fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
- self.inner.delete(path, args)
+ async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+ self.inner.delete(path, args).await
}
- async fn flush(&mut self) -> Result<usize> {
+ async fn close(&mut self) -> Result<()> {
let deadline = self.calculate_deadline(Operation::Delete);
Self::with_io_deadline(
deadline,
@@ -599,7 +599,7 @@ impl<R: oio::Delete> oio::Delete for TailCutWrapper<R> {
&self.stats,
self.size,
Operation::Delete,
- self.inner.flush(),
+ self.inner.close(),
)
.await
}
diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs
index 5240e8f0c..b74bd3f42 100644
--- a/core/src/layers/timeout.rs
+++ b/core/src/layers/timeout.rs
@@ -341,12 +341,12 @@ impl<R: oio::List> oio::List for TimeoutWrapper<R> {
}
impl<R: oio::Delete> oio::Delete for TimeoutWrapper<R> {
- fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
- self.inner.delete(path, args)
+ async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+ self.inner.delete(path, args).await
}
- async fn flush(&mut self) -> Result<usize> {
- let fut = self.inner.flush();
+ async fn close(&mut self) -> Result<()> {
+ let fut = self.inner.close();
Self::io_timeout(self.timeout, Operation::Delete.into_static(),
fut).await
}
}
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index 00d88b5f7..e980477b4 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -335,15 +335,15 @@ impl<R: oio::List> oio::List for TracingWrapper<R> {
}
impl<R: oio::Delete> oio::Delete for TracingWrapper<R> {
- fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+ async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
let _enter = self.span.enter();
- self.inner.delete(path, args)
+ self.inner.delete(path, args).await
}
- async fn flush(&mut self) -> Result<usize> {
+ async fn close(&mut self) -> Result<()> {
let _enter = self.span.enter();
- self.inner.flush().await
+ self.inner.close().await
}
}
diff --git a/core/src/raw/oio/delete/api.rs b/core/src/raw/oio/delete/api.rs
index 95053444c..d21191e0b 100644
--- a/core/src/raw/oio/delete/api.rs
+++ b/core/src/raw/oio/delete/api.rs
@@ -39,35 +39,30 @@ pub trait Delete: Unpin + Send + Sync {
/// - `Err(err)`: An error occurred and the deletion request was not queued
///
/// # Notes
- /// This method just queue the delete request. The actual deletion will be
- /// performed when `flush` is called.
- fn delete(&mut self, path: &str, args: OpDelete) -> Result<()>;
+ /// This method just queues the delete request. The actual deletion will be
+ /// performed when `close` is called.
+ fn delete<'a>(
+ &'a mut self,
+ path: &'a str,
+ args: OpDelete,
+ ) -> impl Future<Output = Result<()>> + MaybeSend + 'a;
- /// Flushes the deletion queue to ensure queued deletions are executed
- ///
- /// # Returns
- /// - `Ok(0)`: All queued deletions have been processed or the queue is
empty.
- /// - `Ok(count)`: The number of resources successfully deleted.
Implementations should
- /// return an error if the queue is non-empty but no resources were
deleted
- /// - `Err(err)`: An error occurred while performing the deletions
- ///
- /// # Notes
- /// - This method is asynchronous and will wait for queued deletions to
complete
- fn flush(&mut self) -> impl Future<Output = Result<usize>> + MaybeSend;
+ /// Close the deleter and ensure all queued deletions are executed.
+ fn close(&mut self) -> impl Future<Output = Result<()>> + MaybeSend;
}
impl Delete for () {
- fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
+ async fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
Err(Error::new(
ErrorKind::Unsupported,
"output deleter doesn't support delete",
))
}
- async fn flush(&mut self) -> Result<usize> {
+ async fn close(&mut self) -> Result<()> {
Err(Error::new(
ErrorKind::Unsupported,
- "output deleter doesn't support flush",
+ "output deleter doesn't support close",
))
}
}
@@ -75,28 +70,32 @@ impl Delete for () {
/// The dyn version of [`Delete`]
pub trait DeleteDyn: Unpin + Send + Sync {
/// The dyn version of [`Delete::delete`]
- fn delete_dyn(&mut self, path: &str, args: OpDelete) -> Result<()>;
+ fn delete_dyn<'a>(&'a mut self, path: &'a str, args: OpDelete) ->
BoxedFuture<'a, Result<()>>;
- /// The dyn version of [`Delete::flush`]
- fn flush_dyn(&mut self) -> BoxedFuture<'_, Result<usize>>;
+ /// The dyn version of [`Delete::close`]
+ fn close_dyn(&mut self) -> BoxedFuture<'_, Result<()>>;
}
impl<T: Delete + ?Sized> DeleteDyn for T {
- fn delete_dyn(&mut self, path: &str, args: OpDelete) -> Result<()> {
- Delete::delete(self, path, args)
+ fn delete_dyn<'a>(&'a mut self, path: &'a str, args: OpDelete) ->
BoxedFuture<'a, Result<()>> {
+ Box::pin(Delete::delete(self, path, args))
}
- fn flush_dyn(&mut self) -> BoxedFuture<'_, Result<usize>> {
- Box::pin(self.flush())
+ fn close_dyn(&mut self) -> BoxedFuture<'_, Result<()>> {
+ Box::pin(self.close())
}
}
impl<T: DeleteDyn + ?Sized> Delete for Box<T> {
- fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+ fn delete<'a>(
+ &'a mut self,
+ path: &'a str,
+ args: OpDelete,
+ ) -> impl Future<Output = Result<()>> + MaybeSend + 'a {
self.deref_mut().delete_dyn(path, args)
}
- async fn flush(&mut self) -> Result<usize> {
- self.deref_mut().flush_dyn().await
+ async fn close(&mut self) -> Result<()> {
+ self.deref_mut().close_dyn().await
}
}
diff --git a/core/src/raw/oio/delete/batch_delete.rs
b/core/src/raw/oio/delete/batch_delete.rs
index d244a22ad..63d262148 100644
--- a/core/src/raw/oio/delete/batch_delete.rs
+++ b/core/src/raw/oio/delete/batch_delete.rs
@@ -23,7 +23,7 @@ use crate::*;
/// BatchDelete is used to implement [`oio::Delete`] based on batch delete
operation.
///
-/// OneShotDeleter will perform delete operation while calling `flush`.
+/// OneShotDeleter will perform delete operation while calling `close`.
pub trait BatchDelete: Send + Sync + Unpin + 'static {
/// delete_once delete one path at once.
///
@@ -59,28 +59,30 @@ pub struct BatchDeleteResult {
pub struct BatchDeleter<D: BatchDelete> {
inner: D,
buffer: HashSet<(String, OpDelete)>,
+ max_batch_size: usize,
}
impl<D: BatchDelete> BatchDeleter<D> {
/// Create a new batch deleter.
- pub fn new(inner: D) -> Self {
+ pub fn new(inner: D, max_batch_size: Option<usize>) -> Self {
+ debug_assert!(
+ max_batch_size.is_some(),
+ "BatchDeleter requires delete_max_size to be configured"
+ );
+ let max_batch_size = max_batch_size.unwrap_or(1);
+
Self {
inner,
buffer: HashSet::default(),
+ max_batch_size,
}
}
-}
-
-impl<D: BatchDelete> oio::Delete for BatchDeleter<D> {
- fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
- self.buffer.insert((path.to_string(), args));
- Ok(())
- }
- async fn flush(&mut self) -> Result<usize> {
+ async fn flush_buffer(&mut self) -> Result<usize> {
if self.buffer.is_empty() {
return Ok(0);
}
+
if self.buffer.len() == 1 {
let (path, args) = self
.buffer
@@ -95,23 +97,26 @@ impl<D: BatchDelete> oio::Delete for BatchDeleter<D> {
let batch = self.buffer.iter().cloned().collect();
let result = self.inner.delete_batch(batch).await?;
- debug_assert!(
- !result.succeeded.is_empty(),
- "the number of succeeded operations must be greater than 0"
- );
- debug_assert_eq!(
- result.succeeded.len() + result.failed.len(),
- self.buffer.len(),
- "the number of succeeded and failed operations must be equal to
the input batch size"
- );
- // Remove all succeeded operations from the buffer.
- let deleted = result.succeeded.len();
+ if result.succeeded.is_empty() {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ "batch delete returned zero successes",
+ ));
+ }
+ if result.succeeded.len() + result.failed.len() != self.buffer.len() {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ "batch delete result size mismatch",
+ ));
+ }
+
+ let mut deleted = 0;
for i in result.succeeded {
self.buffer.remove(&i);
+ deleted += 1;
}
- // Return directly if there are non-temporary errors.
for (path, op, err) in result.failed {
if !err.is_temporary() {
return Err(err
@@ -120,8 +125,37 @@ impl<D: BatchDelete> oio::Delete for BatchDeleter<D> {
}
}
- // Return the number of succeeded operations to allow users to decide
whether
- // to retry or push more delete operations.
Ok(deleted)
}
}
+
+impl<D: BatchDelete> oio::Delete for BatchDeleter<D> {
+ async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+ self.buffer.insert((path.to_string(), args));
+ if self.buffer.len() >= self.max_batch_size {
+ let _ = self.flush_buffer().await?;
+ return Ok(());
+ }
+
+ Ok(())
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ loop {
+ let deleted = self.flush_buffer().await?;
+
+ if self.buffer.is_empty() {
+ break;
+ }
+
+ if deleted == 0 {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ "batch delete made no progress while buffer remains",
+ ));
+ }
+ }
+
+ Ok(())
+ }
+}
diff --git a/core/src/raw/oio/delete/one_shot_delete.rs
b/core/src/raw/oio/delete/one_shot_delete.rs
index 1df00547a..711d5fba8 100644
--- a/core/src/raw/oio/delete/one_shot_delete.rs
+++ b/core/src/raw/oio/delete/one_shot_delete.rs
@@ -22,7 +22,7 @@ use crate::*;
/// OneShotDelete is used to implement [`oio::Delete`] based on one shot
operation.
///
-/// OneShotDeleter will perform delete operation while calling `flush`.
+/// OneShotDeleter will perform delete operation while calling `close`.
pub trait OneShotDelete: Send + Sync + Unpin + 'static {
/// delete_once delete one path at once.
///
@@ -37,43 +37,21 @@ pub trait OneShotDelete: Send + Sync + Unpin + 'static {
/// OneShotDelete is used to implement [`oio::Delete`] based on one shot.
pub struct OneShotDeleter<D> {
inner: D,
- delete: Option<(String, OpDelete)>,
}
impl<D> OneShotDeleter<D> {
/// Create a new one shot deleter.
pub fn new(inner: D) -> Self {
- Self {
- inner,
- delete: None,
- }
- }
-
- fn delete_inner(&mut self, path: String, args: OpDelete) -> Result<()> {
- if self.delete.is_some() {
- return Err(Error::new(
- ErrorKind::Unsupported,
- "OneShotDeleter doesn't support batch delete",
- ));
- }
-
- self.delete = Some((path, args));
- Ok(())
+ Self { inner }
}
}
impl<D: OneShotDelete> oio::Delete for OneShotDeleter<D> {
- fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
- self.delete_inner(path.to_string(), args)
+ async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+ self.inner.delete_once(path.to_string(), args).await
}
- async fn flush(&mut self) -> Result<usize> {
- let Some((path, args)) = self.delete.clone() else {
- return Ok(0);
- };
-
- self.inner.delete_once(path, args).await?;
- self.delete = None;
- Ok(1)
+ async fn close(&mut self) -> Result<()> {
+ Ok(())
}
}
diff --git a/core/src/services/azblob/backend.rs
b/core/src/services/azblob/backend.rs
index 02cf03630..0aedc029c 100644
--- a/core/src/services/azblob/backend.rs
+++ b/core/src/services/azblob/backend.rs
@@ -521,7 +521,10 @@ impl Access for AzblobBackend {
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
Ok((
RpDelete::default(),
- oio::BatchDeleter::new(AzblobDeleter::new(self.core.clone())),
+ oio::BatchDeleter::new(
+ AzblobDeleter::new(self.core.clone()),
+ self.core.info.full_capability().delete_max_size,
+ ),
))
}
diff --git a/core/src/services/cloudflare_kv/backend.rs
b/core/src/services/cloudflare_kv/backend.rs
index a02515654..ec8a4ab13 100644
--- a/core/src/services/cloudflare_kv/backend.rs
+++ b/core/src/services/cloudflare_kv/backend.rs
@@ -480,7 +480,10 @@ impl Access for CloudflareKvBackend {
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
Ok((
RpDelete::default(),
-
oio::BatchDeleter::new(CloudflareKvDeleter::new(self.core.clone())),
+ oio::BatchDeleter::new(
+ CloudflareKvDeleter::new(self.core.clone()),
+ self.core.info.full_capability().delete_max_size,
+ ),
))
}
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index 9c556d032..c57cd9b00 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -429,7 +429,10 @@ impl Access for GcsBackend {
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
Ok((
RpDelete::default(),
- oio::BatchDeleter::new(GcsDeleter::new(self.core.clone())),
+ oio::BatchDeleter::new(
+ GcsDeleter::new(self.core.clone()),
+ self.core.info.full_capability().delete_max_size,
+ ),
))
}
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 2e5f9f33a..0e6a5aa01 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -656,7 +656,10 @@ impl Access for OssBackend {
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
Ok((
RpDelete::default(),
- oio::BatchDeleter::new(OssDeleter::new(self.core.clone())),
+ oio::BatchDeleter::new(
+ OssDeleter::new(self.core.clone()),
+ self.core.info.full_capability().delete_max_size,
+ ),
))
}
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index c5d42ba5f..7c0e72000 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -1059,7 +1059,10 @@ impl Access for S3Backend {
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
Ok((
RpDelete::default(),
- oio::BatchDeleter::new(S3Deleter::new(self.core.clone())),
+ oio::BatchDeleter::new(
+ S3Deleter::new(self.core.clone()),
+ self.core.info.full_capability().delete_max_size,
+ ),
))
}
diff --git a/core/src/types/delete/deleter.rs b/core/src/types/delete/deleter.rs
index 4675b9751..05621f0f7 100644
--- a/core/src/types/delete/deleter.rs
+++ b/core/src/types/delete/deleter.rs
@@ -83,38 +83,24 @@ use crate::*;
/// ```
pub struct Deleter {
deleter: oio::Deleter,
-
- max_size: usize,
- cur_size: usize,
}
impl Deleter {
pub(crate) async fn create(acc: Accessor) -> Result<Self> {
- let max_size =
acc.info().full_capability().delete_max_size.unwrap_or(1);
let (_, deleter) = acc.delete().await?;
- Ok(Self {
- deleter,
- max_size,
- cur_size: 0,
- })
+ Ok(Self { deleter })
}
/// Delete a path.
pub async fn delete(&mut self, input: impl IntoDeleteInput) -> Result<()> {
- if self.cur_size >= self.max_size {
- let deleted = self.deleter.flush_dyn().await?;
- self.cur_size -= deleted;
- }
-
let input = input.into_delete_input();
let mut op = OpDelete::default();
if let Some(version) = &input.version {
op = op.with_version(version);
}
- self.deleter.delete_dyn(&input.path, op)?;
- self.cur_size += 1;
+ self.deleter.delete_dyn(&input.path, op).await?;
Ok(())
}
@@ -195,22 +181,9 @@ impl Deleter {
Ok(())
}
- /// Flush the deleter, returns the number of deleted paths.
- pub async fn flush(&mut self) -> Result<usize> {
- let deleted = self.deleter.flush_dyn().await?;
- self.cur_size -= deleted;
- Ok(deleted)
- }
-
/// Close the deleter, this will flush the deleter and wait until all
paths are deleted.
pub async fn close(&mut self) -> Result<()> {
- loop {
- self.flush().await?;
- if self.cur_size == 0 {
- break;
- }
- }
- Ok(())
+ self.deleter.close_dyn().await
}
/// Convert the deleter into a sink.
diff --git a/core/src/types/delete/futures_delete_sink.rs
b/core/src/types/delete/futures_delete_sink.rs
index 793b3e717..5600ebafc 100644
--- a/core/src/types/delete/futures_delete_sink.rs
+++ b/core/src/types/delete/futures_delete_sink.rs
@@ -35,7 +35,6 @@ pub struct FuturesDeleteSink<T: IntoDeleteInput> {
enum State {
Idle(Option<Deleter>),
Delete(BoxedStaticFuture<(Deleter, Result<()>)>),
- Flush(BoxedStaticFuture<(Deleter, Result<usize>)>),
Close(BoxedStaticFuture<(Deleter, Result<()>)>),
}
@@ -60,11 +59,6 @@ impl<T: IntoDeleteInput> Sink<T> for FuturesDeleteSink<T> {
self.state = State::Idle(Some(deleter));
Poll::Ready(res.map(|_| ()))
}
- State::Flush(fut) => {
- let (deleter, res) = ready!(fut.as_mut().poll(cx));
- self.state = State::Idle(Some(deleter));
- Poll::Ready(res.map(|_| ()))
- }
State::Close(fut) => {
let (deleter, res) = ready!(fut.as_mut().poll(cx));
self.state = State::Idle(Some(deleter));
@@ -100,32 +94,13 @@ impl<T: IntoDeleteInput> Sink<T> for FuturesDeleteSink<T> {
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Result<()>> {
loop {
match &mut self.state {
- State::Idle(deleter) => {
- let mut deleter = deleter.take().ok_or_else(|| {
- Error::new(
- ErrorKind::Unexpected,
- "FuturesDeleteSink has been closed or errored",
- )
- })?;
- let fut = async move {
- let res = deleter.flush().await;
- (deleter, res)
- };
- self.state = State::Flush(Box::pin(fut));
- return Poll::Ready(Ok(()));
- }
+ State::Idle(_) => return Poll::Ready(Ok(())),
State::Delete(fut) => {
let (deleter, res) = ready!(fut.as_mut().poll(cx));
self.state = State::Idle(Some(deleter));
res?;
continue;
}
- State::Flush(fut) => {
- let (deleter, res) = ready!(fut.as_mut().poll(cx));
- self.state = State::Idle(Some(deleter));
- let _ = res?;
- return Poll::Ready(Ok(()));
- }
State::Close(fut) => {
let (deleter, res) = ready!(fut.as_mut().poll(cx));
self.state = State::Idle(Some(deleter));
@@ -159,12 +134,6 @@ impl<T: IntoDeleteInput> Sink<T> for FuturesDeleteSink<T> {
res?;
continue;
}
- State::Flush(fut) => {
- let (deleter, res) = ready!(fut.as_mut().poll(cx));
- self.state = State::Idle(Some(deleter));
- res?;
- continue;
- }
State::Close(fut) => {
let (deleter, res) = ready!(fut.as_mut().poll(cx));
self.state = State::Idle(Some(deleter));
diff --git a/core/src/types/operator/operator.rs
b/core/src/types/operator/operator.rs
index 5f255ab52..82c722e76 100644
--- a/core/src/types/operator/operator.rs
+++ b/core/src/types/operator/operator.rs
@@ -1336,8 +1336,8 @@ impl Operator {
async fn delete_inner(acc: Accessor, path: String, opts:
options::DeleteOptions) -> Result<()> {
let (_, mut deleter) = acc.delete_dyn().await?;
let args = opts.into();
- deleter.delete_dyn(&path, args)?;
- deleter.flush_dyn().await?;
+ deleter.delete_dyn(&path, args).await?;
+ deleter.close_dyn().await?;
Ok(())
}
diff --git a/integrations/object_store/src/service/mod.rs
b/integrations/object_store/src/service/mod.rs
index 825a2503a..f7bb8f7bf 100644
--- a/integrations/object_store/src/service/mod.rs
+++ b/integrations/object_store/src/service/mod.rs
@@ -143,7 +143,7 @@ impl Access for ObjectStoreService {
}
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
- let deleter =
BatchDeleter::new(ObjectStoreDeleter::new(self.store.clone()));
+ let deleter =
BatchDeleter::new(ObjectStoreDeleter::new(self.store.clone()), Some(1000));
Ok((RpDelete::default(), deleter))
}
@@ -355,8 +355,9 @@ mod tests {
let (_, mut deleter) = backend.delete().await.expect("delete should
succeed");
deleter
.delete(path, OpDelete::default())
+ .await
.expect("delete should succeed");
- deleter.flush().await.expect("flush should succeed");
+ deleter.close().await.expect("close should succeed");
// Verify file is deleted
let result = backend.stat(path, OpStat::default()).await;