This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch xuanwo/delete-api-review in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 6b02540326279db7945569d92e7722f71e55a529 Author: Xuanwo <[email protected]> AuthorDate: Thu Nov 27 01:49:33 2025 +0800 refactor!: Refactor oio::Delete to make it's API simple Signed-off-by: Xuanwo <[email protected]> --- core/src/blocking/delete.rs | 5 --- core/src/layers/async_backtrace.rs | 9 +++-- core/src/layers/await_tree.rs | 8 ++-- core/src/layers/concurrent_limit.rs | 8 ++-- core/src/layers/correctness_check.rs | 14 +++---- core/src/layers/error_context.rs | 22 ++++------- core/src/layers/fastrace.rs | 11 +++--- core/src/layers/logging.rs | 27 ++++---------- core/src/layers/observe/metrics.rs | 7 ++-- core/src/layers/retry.rs | 55 +++++++++++++++++++--------- core/src/layers/tail_cut.rs | 8 ++-- core/src/layers/timeout.rs | 8 ++-- core/src/layers/tracing.rs | 8 ++-- core/src/raw/oio/delete/api.rs | 45 +++++++++-------------- core/src/raw/oio/delete/batch_delete.rs | 15 +++----- core/src/raw/oio/delete/one_shot_delete.rs | 10 ++--- core/src/types/delete/deleter.rs | 33 ++--------------- core/src/types/delete/futures_delete_sink.rs | 33 +---------------- core/src/types/operator/operator.rs | 4 +- integrations/object_store/src/service/mod.rs | 2 +- 20 files changed, 131 insertions(+), 201 deletions(-) diff --git a/core/src/blocking/delete.rs b/core/src/blocking/delete.rs index 41d091d63..e8b65157f 100644 --- a/core/src/blocking/delete.rs +++ b/core/src/blocking/delete.rs @@ -62,11 +62,6 @@ impl Deleter { self.handle.block_on(self.inner.delete_try_iter(try_iter)) } - /// Flush the deleter, returns the number of deleted paths. - pub fn flush(&mut self) -> Result<usize> { - self.handle.block_on(self.inner.flush()) - } - /// Close the deleter, this will flush the deleter and wait until all paths are deleted. pub fn close(&mut self) -> Result<()> { self.handle.block_on(self.inner.close()) diff --git a/core/src/layers/async_backtrace.rs b/core/src/layers/async_backtrace.rs index e68c37817..7cbc1516b 100644 --- a/core/src/layers/async_backtrace.rs +++ b/core/src/layers/async_backtrace.rs @@ -162,12 +162,13 @@ impl<R: oio::List> oio::List for AsyncBacktraceWrapper<R> { } impl<R: oio::Delete> oio::Delete for AsyncBacktraceWrapper<R> { - fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { - self.inner.delete(path, args) + #[async_backtrace::framed] + async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { + self.inner.delete(path, args).await } #[async_backtrace::framed] - async fn flush(&mut self) -> Result<usize> { - self.inner.flush().await + async fn close(&mut self) -> Result<()> { + self.inner.close().await } } diff --git a/core/src/layers/await_tree.rs b/core/src/layers/await_tree.rs index 416d814ca..449721c72 100644 --- a/core/src/layers/await_tree.rs +++ b/core/src/layers/await_tree.rs @@ -188,13 +188,13 @@ impl<R: oio::List> oio::List for AwaitTreeWrapper<R> { } impl<R: oio::Delete> oio::Delete for AwaitTreeWrapper<R> { - fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { - self.inner.delete(path, args) + async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { + self.inner.delete(path, args).await } - async fn flush(&mut self) -> Result<usize> { + async fn close(&mut self) -> Result<()> { self.inner - .flush() + .close() .instrument_await(format!("opendal::{}", Operation::Delete)) .await } diff --git a/core/src/layers/concurrent_limit.rs b/core/src/layers/concurrent_limit.rs index 0f42007ba..4263963e5 100644 --- a/core/src/layers/concurrent_limit.rs +++ b/core/src/layers/concurrent_limit.rs @@ -279,11 +279,11 @@ impl<R: oio::List> oio::List for ConcurrentLimitWrapper<R> { } impl<R: oio::Delete> oio::Delete for ConcurrentLimitWrapper<R> { - fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { - self.inner.delete(path, args) + async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { + self.inner.delete(path, args).await } - async fn flush(&mut self) -> Result<usize> { - self.inner.flush().await + async fn close(&mut self) -> Result<()> { + self.inner.close().await } } diff --git a/core/src/layers/correctness_check.rs b/core/src/layers/correctness_check.rs index 0b8c962ff..ca24fe11c 100644 --- a/core/src/layers/correctness_check.rs +++ b/core/src/layers/correctness_check.rs @@ -239,13 +239,13 @@ impl<T> CheckWrapper<T> { } impl<T: oio::Delete> oio::Delete for CheckWrapper<T> { - fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { + async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { self.check_delete(&args)?; - self.inner.delete(path, args) + self.inner.delete(path, args).await } - fn flush(&mut self) -> impl Future<Output = Result<usize>> + MaybeSend { - self.inner.flush() + fn close(&mut self) -> impl Future<Output = Result<()>> + MaybeSend { + self.inner.close() } } @@ -317,12 +317,12 @@ mod tests { struct MockDeleter; impl oio::Delete for MockDeleter { - fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> { + async fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> { Ok(()) } - async fn flush(&mut self) -> Result<usize> { - Ok(1) + async fn close(&mut self) -> Result<()> { + Ok(()) } } diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs index 305c62bce..1bd7a4dcd 100644 --- a/core/src/layers/error_context.rs +++ b/core/src/layers/error_context.rs @@ -285,8 +285,8 @@ impl<T: oio::List> oio::List for ErrorContextWrapper<T> { } impl<T: oio::Delete> oio::Delete for ErrorContextWrapper<T> { - fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { - self.inner.delete(path, args).map_err(|err| { + async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { + self.inner.delete(path, args).await.map_err(|err| { err.with_operation(Operation::Delete) .with_context("service", self.scheme) .with_context("path", path) @@ -294,17 +294,11 @@ impl<T: oio::Delete> oio::Delete for ErrorContextWrapper<T> { }) } - async fn flush(&mut self) -> Result<usize> { - self.inner - .flush() - .await - .inspect(|&n| { - self.processed += n as u64; - }) - .map_err(|err| { - err.with_operation(Operation::Delete) - .with_context("service", self.scheme) - .with_context("deleted", self.processed.to_string()) - }) + async fn close(&mut self) -> Result<()> { + self.inner.close().await.map_err(|err| { + err.with_operation(Operation::Delete) + .with_context("service", self.scheme) + .with_context("deleted", self.processed.to_string()) + }) } } diff --git a/core/src/layers/fastrace.rs b/core/src/layers/fastrace.rs index 18dd50ca5..fb81d8cce 100644 --- a/core/src/layers/fastrace.rs +++ b/core/src/layers/fastrace.rs @@ -258,14 +258,13 @@ impl<R: oio::List> oio::List for FastraceWrapper<R> { } impl<R: oio::Delete> oio::Delete for FastraceWrapper<R> { - fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { - let _g = self.span.set_local_parent(); - let _span = LocalSpan::enter_with_local_parent(Operation::Delete.into_static()); - self.inner.delete(path, args) + #[trace(enter_on_poll = true)] + async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { + self.inner.delete(path, args).await } #[trace(enter_on_poll = true)] - async fn flush(&mut self) -> Result<usize> { - self.inner.flush().await + async fn close(&mut self) -> Result<()> { + self.inner.close().await } } diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index 82c48f87f..c64fb21ef 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -753,7 +753,6 @@ pub struct LoggingDeleter<D, I: LoggingInterceptor> { info: Arc<AccessorInfo>, logger: I, - queued: usize, deleted: usize, inner: D, } @@ -764,7 +763,6 @@ impl<D, I: LoggingInterceptor> LoggingDeleter<D, I> { info, logger, - queued: 0, deleted: 0, inner, } @@ -772,17 +770,17 @@ impl<D, I: LoggingInterceptor> LoggingDeleter<D, I> { } impl<D: oio::Delete, I: LoggingInterceptor> oio::Delete for LoggingDeleter<D, I> { - fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { + async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { let version = args .version() .map(|v| v.to_string()) .unwrap_or_else(|| "<latest>".to_string()); - let res = self.inner.delete(path, args); + let res = self.inner.delete(path, args).await; match &res { Ok(_) => { - self.queued += 1; + self.deleted += 1; } Err(err) => { self.logger.log( @@ -791,7 +789,6 @@ impl<D: oio::Delete, I: LoggingInterceptor> oio::Delete for LoggingDeleter<D, I> &[ ("path", path), ("version", &version), - ("queued", &self.queued.to_string()), ("deleted", &self.deleted.to_string()), ], "failed", @@ -803,20 +800,15 @@ impl<D: oio::Delete, I: LoggingInterceptor> oio::Delete for LoggingDeleter<D, I> res } - async fn flush(&mut self) -> Result<usize> { - let res = self.inner.flush().await; + async fn close(&mut self) -> Result<()> { + let res = self.inner.close().await; match &res { - Ok(flushed) => { - self.queued -= flushed; - self.deleted += flushed; + Ok(_) => { self.logger.log( &self.info, Operation::Delete, - &[ - ("queued", &self.queued.to_string()), - ("deleted", &self.deleted.to_string()), - ], + &[("deleted", &self.deleted.to_string())], "succeeded", None, ); @@ -825,10 +817,7 @@ impl<D: oio::Delete, I: LoggingInterceptor> oio::Delete for LoggingDeleter<D, I> self.logger.log( &self.info, Operation::Delete, - &[ - ("queued", &self.queued.to_string()), - ("deleted", &self.deleted.to_string()), - ], + &[("deleted", &self.deleted.to_string())], "failed", Some(err), ); diff --git a/core/src/layers/observe/metrics.rs b/core/src/layers/observe/metrics.rs index ea779ece1..7441d82f9 100644 --- a/core/src/layers/observe/metrics.rs +++ b/core/src/layers/observe/metrics.rs @@ -935,9 +935,10 @@ impl<R: oio::List, I: MetricsIntercept> oio::List for MetricsWrapper<R, I> { } impl<R: oio::Delete, I: MetricsIntercept> oio::Delete for MetricsWrapper<R, I> { - fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { + async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { self.inner .delete(path, args) + .await .inspect(|_| { self.size += 1; }) @@ -949,8 +950,8 @@ impl<R: oio::Delete, I: MetricsIntercept> oio::Delete for MetricsWrapper<R, I> { }) } - async fn flush(&mut self) -> Result<usize> { - self.inner.flush().await.inspect_err(|err| { + async fn close(&mut self) -> Result<()> { + self.inner.close().await.inspect_err(|err| { self.interceptor.observe( self.labels.clone().with_error(err.kind()), MetricValue::OperationErrorsTotal, diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index 59a63830f..2b5e965b2 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -20,7 +20,6 @@ use std::fmt::Formatter; use std::sync::Arc; use std::time::Duration; -use backon::BlockingRetryable; use backon::ExponentialBuilder; use backon::Retryable; use log::warn; @@ -562,25 +561,43 @@ impl<P: oio::List, I: RetryInterceptor> oio::List for RetryWrapper<P, I> { } impl<P: oio::Delete, I: RetryInterceptor> oio::Delete for RetryWrapper<P, I> { - fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { - { || self.inner.as_mut().unwrap().delete(path, args.clone()) } - .retry(self.builder) - .when(|e| e.is_temporary()) - .notify(|err, dur| { - self.notify.intercept(err, dur); - }) - .call() - .map_err(|e| e.set_persistent()) + async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { + use backon::RetryableWithContext; + + let inner = self.take_inner()?; + let path = path.to_string(); + let args_cloned = args.clone(); + + let (inner, res) = { + |mut p: P| { + let path = path.clone(); + let args = args_cloned.clone(); + async move { + let res = p.delete(&path, args).await; + (p, res) + } + } + } + .retry(self.builder) + .when(|e| e.is_temporary()) + .context(inner) + .notify(|err, dur| { + self.notify.intercept(err, dur); + }) + .await; + + self.inner = Some(inner); + res.map_err(|e| e.set_persistent()) } - async fn flush(&mut self) -> Result<usize> { + async fn close(&mut self) -> Result<()> { use backon::RetryableWithContext; let inner = self.take_inner()?; let (inner, res) = { |mut p: P| async move { - let res = p.flush().await; + let res = p.close().await; (p, res) } @@ -790,12 +807,12 @@ mod tests { } impl oio::Delete for MockDeleter { - fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> { + async fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> { self.size += 1; Ok(()) } - async fn flush(&mut self) -> Result<usize> { + async fn close(&mut self) -> Result<()> { let mut attempt = self.attempt.lock().unwrap(); *attempt += 1; @@ -805,8 +822,8 @@ mod tests { .set_temporary(), ), 2 => { - self.size -= 1; - Ok(1) + self.size = self.size.saturating_sub(1); + Ok(()) } 3 => Err( Error::new(ErrorKind::Unexpected, "retryable_error from deleter") @@ -818,7 +835,11 @@ mod tests { ), 5 => { let s = mem::take(&mut self.size); - Ok(s) + if s == 0 { + Err(Error::new(ErrorKind::Unexpected, "no progress")) + } else { + Ok(()) + } } _ => unreachable!(), } diff --git a/core/src/layers/tail_cut.rs b/core/src/layers/tail_cut.rs index 42a071c60..ab89e5083 100644 --- a/core/src/layers/tail_cut.rs +++ b/core/src/layers/tail_cut.rs @@ -587,11 +587,11 @@ impl<R: oio::List> oio::List for TailCutWrapper<R> { } impl<R: oio::Delete> oio::Delete for TailCutWrapper<R> { - fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { - self.inner.delete(path, args) + async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { + self.inner.delete(path, args).await } - async fn flush(&mut self) -> Result<usize> { + async fn close(&mut self) -> Result<()> { let deadline = self.calculate_deadline(Operation::Delete); Self::with_io_deadline( deadline, @@ -599,7 +599,7 @@ impl<R: oio::Delete> oio::Delete for TailCutWrapper<R> { &self.stats, self.size, Operation::Delete, - self.inner.flush(), + self.inner.close(), ) .await } diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index 5240e8f0c..b74bd3f42 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -341,12 +341,12 @@ impl<R: oio::List> oio::List for TimeoutWrapper<R> { } impl<R: oio::Delete> oio::Delete for TimeoutWrapper<R> { - fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { - self.inner.delete(path, args) + async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { + self.inner.delete(path, args).await } - async fn flush(&mut self) -> Result<usize> { - let fut = self.inner.flush(); + async fn close(&mut self) -> Result<()> { + let fut = self.inner.close(); Self::io_timeout(self.timeout, Operation::Delete.into_static(), fut).await } } diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs index 00d88b5f7..e980477b4 100644 --- a/core/src/layers/tracing.rs +++ b/core/src/layers/tracing.rs @@ -335,15 +335,15 @@ impl<R: oio::List> oio::List for TracingWrapper<R> { } impl<R: oio::Delete> oio::Delete for TracingWrapper<R> { - fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { + async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { let _enter = self.span.enter(); - self.inner.delete(path, args) + self.inner.delete(path, args).await } - async fn flush(&mut self) -> Result<usize> { + async fn close(&mut self) -> Result<()> { let _enter = self.span.enter(); - self.inner.flush().await + self.inner.close().await } } diff --git a/core/src/raw/oio/delete/api.rs b/core/src/raw/oio/delete/api.rs index 95053444c..9c04e1183 100644 --- a/core/src/raw/oio/delete/api.rs +++ b/core/src/raw/oio/delete/api.rs @@ -39,35 +39,26 @@ pub trait Delete: Unpin + Send + Sync { /// - `Err(err)`: An error occurred and the deletion request was not queued /// /// # Notes - /// This method just queue the delete request. The actual deletion will be - /// performed when `flush` is called. - fn delete(&mut self, path: &str, args: OpDelete) -> Result<()>; + /// This method just queues the delete request. The actual deletion will be + /// performed when `close` is called. + fn delete<'a>(&'a mut self, path: &'a str, args: OpDelete) -> impl Future<Output = Result<()>> + MaybeSend + 'a; - /// Flushes the deletion queue to ensure queued deletions are executed - /// - /// # Returns - /// - `Ok(0)`: All queued deletions have been processed or the queue is empty. - /// - `Ok(count)`: The number of resources successfully deleted. Implementations should - /// return an error if the queue is non-empty but no resources were deleted - /// - `Err(err)`: An error occurred while performing the deletions - /// - /// # Notes - /// - This method is asynchronous and will wait for queued deletions to complete - fn flush(&mut self) -> impl Future<Output = Result<usize>> + MaybeSend; + /// Close the deleter and ensure all queued deletions are executed. + fn close(&mut self) -> impl Future<Output = Result<()>> + MaybeSend; } impl Delete for () { - fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> { + async fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "output deleter doesn't support delete", )) } - async fn flush(&mut self) -> Result<usize> { + async fn close(&mut self) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, - "output deleter doesn't support flush", + "output deleter doesn't support close", )) } } @@ -75,28 +66,28 @@ impl Delete for () { /// The dyn version of [`Delete`] pub trait DeleteDyn: Unpin + Send + Sync { /// The dyn version of [`Delete::delete`] - fn delete_dyn(&mut self, path: &str, args: OpDelete) -> Result<()>; + fn delete_dyn<'a>(&'a mut self, path: &'a str, args: OpDelete) -> BoxedFuture<'a, Result<()>>; - /// The dyn version of [`Delete::flush`] - fn flush_dyn(&mut self) -> BoxedFuture<'_, Result<usize>>; + /// The dyn version of [`Delete::close`] + fn close_dyn(&mut self) -> BoxedFuture<'_, Result<()>>; } impl<T: Delete + ?Sized> DeleteDyn for T { - fn delete_dyn(&mut self, path: &str, args: OpDelete) -> Result<()> { - Delete::delete(self, path, args) + fn delete_dyn<'a>(&'a mut self, path: &'a str, args: OpDelete) -> BoxedFuture<'a, Result<()>> { + Box::pin(Delete::delete(self, path, args)) } - fn flush_dyn(&mut self) -> BoxedFuture<'_, Result<usize>> { - Box::pin(self.flush()) + fn close_dyn(&mut self) -> BoxedFuture<'_, Result<()>> { + Box::pin(self.close()) } } impl<T: DeleteDyn + ?Sized> Delete for Box<T> { - fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { + fn delete<'a>(&'a mut self, path: &'a str, args: OpDelete) -> impl Future<Output = Result<()>> + MaybeSend + 'a { self.deref_mut().delete_dyn(path, args) } - async fn flush(&mut self) -> Result<usize> { - self.deref_mut().flush_dyn().await + async fn close(&mut self) -> Result<()> { + self.deref_mut().close_dyn().await } } diff --git a/core/src/raw/oio/delete/batch_delete.rs b/core/src/raw/oio/delete/batch_delete.rs index d244a22ad..2ecdf2df3 100644 --- a/core/src/raw/oio/delete/batch_delete.rs +++ b/core/src/raw/oio/delete/batch_delete.rs @@ -23,7 +23,7 @@ use crate::*; /// BatchDelete is used to implement [`oio::Delete`] based on batch delete operation. /// -/// OneShotDeleter will perform delete operation while calling `flush`. +/// OneShotDeleter will perform delete operation while calling `close`. pub trait BatchDelete: Send + Sync + Unpin + 'static { /// delete_once delete one path at once. /// @@ -72,14 +72,14 @@ impl<D: BatchDelete> BatchDeleter<D> { } impl<D: BatchDelete> oio::Delete for BatchDeleter<D> { - fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { + async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { self.buffer.insert((path.to_string(), args)); Ok(()) } - async fn flush(&mut self) -> Result<usize> { + async fn close(&mut self) -> Result<()> { if self.buffer.is_empty() { - return Ok(0); + return Ok(()); } if self.buffer.len() == 1 { let (path, args) = self @@ -90,7 +90,7 @@ impl<D: BatchDelete> oio::Delete for BatchDeleter<D> { .clone(); self.inner.delete_once(path, args).await?; self.buffer.clear(); - return Ok(1); + return Ok(()); } let batch = self.buffer.iter().cloned().collect(); @@ -106,7 +106,6 @@ impl<D: BatchDelete> oio::Delete for BatchDeleter<D> { ); // Remove all succeeded operations from the buffer. - let deleted = result.succeeded.len(); for i in result.succeeded { self.buffer.remove(&i); } @@ -120,8 +119,6 @@ impl<D: BatchDelete> oio::Delete for BatchDeleter<D> { } } - // Return the number of succeeded operations to allow users to decide whether - // to retry or push more delete operations. - Ok(deleted) + Ok(()) } } diff --git a/core/src/raw/oio/delete/one_shot_delete.rs b/core/src/raw/oio/delete/one_shot_delete.rs index 1df00547a..204e1515a 100644 --- a/core/src/raw/oio/delete/one_shot_delete.rs +++ b/core/src/raw/oio/delete/one_shot_delete.rs @@ -22,7 +22,7 @@ use crate::*; /// OneShotDelete is used to implement [`oio::Delete`] based on one shot operation. /// -/// OneShotDeleter will perform delete operation while calling `flush`. +/// OneShotDeleter will perform delete operation while calling `close`. pub trait OneShotDelete: Send + Sync + Unpin + 'static { /// delete_once delete one path at once. /// @@ -63,17 +63,17 @@ impl<D> OneShotDeleter<D> { } impl<D: OneShotDelete> oio::Delete for OneShotDeleter<D> { - fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { + async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { self.delete_inner(path.to_string(), args) } - async fn flush(&mut self) -> Result<usize> { + async fn close(&mut self) -> Result<()> { let Some((path, args)) = self.delete.clone() else { - return Ok(0); + return Ok(()); }; self.inner.delete_once(path, args).await?; self.delete = None; - Ok(1) + Ok(()) } } diff --git a/core/src/types/delete/deleter.rs b/core/src/types/delete/deleter.rs index 4675b9751..05621f0f7 100644 --- a/core/src/types/delete/deleter.rs +++ b/core/src/types/delete/deleter.rs @@ -83,38 +83,24 @@ use crate::*; /// ``` pub struct Deleter { deleter: oio::Deleter, - - max_size: usize, - cur_size: usize, } impl Deleter { pub(crate) async fn create(acc: Accessor) -> Result<Self> { - let max_size = acc.info().full_capability().delete_max_size.unwrap_or(1); let (_, deleter) = acc.delete().await?; - Ok(Self { - deleter, - max_size, - cur_size: 0, - }) + Ok(Self { deleter }) } /// Delete a path. pub async fn delete(&mut self, input: impl IntoDeleteInput) -> Result<()> { - if self.cur_size >= self.max_size { - let deleted = self.deleter.flush_dyn().await?; - self.cur_size -= deleted; - } - let input = input.into_delete_input(); let mut op = OpDelete::default(); if let Some(version) = &input.version { op = op.with_version(version); } - self.deleter.delete_dyn(&input.path, op)?; - self.cur_size += 1; + self.deleter.delete_dyn(&input.path, op).await?; Ok(()) } @@ -195,22 +181,9 @@ impl Deleter { Ok(()) } - /// Flush the deleter, returns the number of deleted paths. - pub async fn flush(&mut self) -> Result<usize> { - let deleted = self.deleter.flush_dyn().await?; - self.cur_size -= deleted; - Ok(deleted) - } - /// Close the deleter, this will flush the deleter and wait until all paths are deleted. pub async fn close(&mut self) -> Result<()> { - loop { - self.flush().await?; - if self.cur_size == 0 { - break; - } - } - Ok(()) + self.deleter.close_dyn().await } /// Convert the deleter into a sink. diff --git a/core/src/types/delete/futures_delete_sink.rs b/core/src/types/delete/futures_delete_sink.rs index 793b3e717..5600ebafc 100644 --- a/core/src/types/delete/futures_delete_sink.rs +++ b/core/src/types/delete/futures_delete_sink.rs @@ -35,7 +35,6 @@ pub struct FuturesDeleteSink<T: IntoDeleteInput> { enum State { Idle(Option<Deleter>), Delete(BoxedStaticFuture<(Deleter, Result<()>)>), - Flush(BoxedStaticFuture<(Deleter, Result<usize>)>), Close(BoxedStaticFuture<(Deleter, Result<()>)>), } @@ -60,11 +59,6 @@ impl<T: IntoDeleteInput> Sink<T> for FuturesDeleteSink<T> { self.state = State::Idle(Some(deleter)); Poll::Ready(res.map(|_| ())) } - State::Flush(fut) => { - let (deleter, res) = ready!(fut.as_mut().poll(cx)); - self.state = State::Idle(Some(deleter)); - Poll::Ready(res.map(|_| ())) - } State::Close(fut) => { let (deleter, res) = ready!(fut.as_mut().poll(cx)); self.state = State::Idle(Some(deleter)); @@ -100,32 +94,13 @@ impl<T: IntoDeleteInput> Sink<T> for FuturesDeleteSink<T> { fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { loop { match &mut self.state { - State::Idle(deleter) => { - let mut deleter = deleter.take().ok_or_else(|| { - Error::new( - ErrorKind::Unexpected, - "FuturesDeleteSink has been closed or errored", - ) - })?; - let fut = async move { - let res = deleter.flush().await; - (deleter, res) - }; - self.state = State::Flush(Box::pin(fut)); - return Poll::Ready(Ok(())); - } + State::Idle(_) => return Poll::Ready(Ok(())), State::Delete(fut) => { let (deleter, res) = ready!(fut.as_mut().poll(cx)); self.state = State::Idle(Some(deleter)); res?; continue; } - State::Flush(fut) => { - let (deleter, res) = ready!(fut.as_mut().poll(cx)); - self.state = State::Idle(Some(deleter)); - let _ = res?; - return Poll::Ready(Ok(())); - } State::Close(fut) => { let (deleter, res) = ready!(fut.as_mut().poll(cx)); self.state = State::Idle(Some(deleter)); @@ -159,12 +134,6 @@ impl<T: IntoDeleteInput> Sink<T> for FuturesDeleteSink<T> { res?; continue; } - State::Flush(fut) => { - let (deleter, res) = ready!(fut.as_mut().poll(cx)); - self.state = State::Idle(Some(deleter)); - res?; - continue; - } State::Close(fut) => { let (deleter, res) = ready!(fut.as_mut().poll(cx)); self.state = State::Idle(Some(deleter)); diff --git a/core/src/types/operator/operator.rs b/core/src/types/operator/operator.rs index 5f255ab52..82c722e76 100644 --- a/core/src/types/operator/operator.rs +++ b/core/src/types/operator/operator.rs @@ -1336,8 +1336,8 @@ impl Operator { async fn delete_inner(acc: Accessor, path: String, opts: options::DeleteOptions) -> Result<()> { let (_, mut deleter) = acc.delete_dyn().await?; let args = opts.into(); - deleter.delete_dyn(&path, args)?; - deleter.flush_dyn().await?; + deleter.delete_dyn(&path, args).await?; + deleter.close_dyn().await?; Ok(()) } diff --git a/integrations/object_store/src/service/mod.rs b/integrations/object_store/src/service/mod.rs index 825a2503a..35a59020a 100644 --- a/integrations/object_store/src/service/mod.rs +++ b/integrations/object_store/src/service/mod.rs @@ -356,7 +356,7 @@ mod tests { deleter .delete(path, OpDelete::default()) .expect("delete should succeed"); - deleter.flush().await.expect("flush should succeed"); + deleter.close().await.expect("close should succeed"); // Verify file is deleted let result = backend.stat(path, OpStat::default()).await;
