This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new b464f04a7 refactor: Implement SimulateLayer to make simulate logic
more maintai… (#6822)
b464f04a7 is described below
commit b464f04a778d7e11e326a17d7419c5936b90e502
Author: Xuanwo <[email protected]>
AuthorDate: Wed Nov 26 23:18:01 2025 +0800
refactor: Implement SimulateLayer to make simulate logic more maintai…
(#6822)
* refactor: Implement SimulateLayer to make simulate logic more maintainable
Signed-off-by: Xuanwo <[email protected]>
* Fix ci
Signed-off-by: Xuanwo <[email protected]>
* start after is not supported before
Signed-off-by: Xuanwo <[email protected]>
* Fix ci
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
core/src/layers/complete.rs | 161 ++---------------------
core/src/layers/mod.rs | 3 +
core/src/layers/simulate.rs | 256 +++++++++++++++++++++++++++++++++++++
core/src/types/operator/builder.rs | 1 +
4 files changed, 271 insertions(+), 150 deletions(-)
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 830db569d..75fa9ee09 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -20,54 +20,22 @@ use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;
-use crate::raw::oio::FlatLister;
-use crate::raw::oio::PrefixLister;
-use crate::raw::*;
+use crate::raw::oio;
+use crate::raw::{
+ Access, AccessorInfo, Layer, LayeredAccess, OpCreateDir, OpList,
OpPresign, OpRead, OpStat,
+ OpWrite, RpCreateDir, RpDelete, RpList, RpPresign, RpRead, RpStat, RpWrite,
+};
use crate::*;
-/// Complete underlying services features so that users can use them in
-/// the same way.
-///
-/// # Notes
-///
-/// CompleteLayer is not a public accessible layer that can be used by
-/// external users. OpenDAL will make sure every accessor will apply this
-/// layer once and only once.
-///
-/// # Internal
-///
-/// So far `CompleteLayer` will do the following things:
-///
-/// ## Stat Completion
-///
-/// Not all services support stat dir natively, but we can simulate it via
list.
-///
-/// ## List Completion
-///
-/// There are two styles of list, but not all services support both of
-/// them. CompleteLayer will add those capabilities in a zero cost way.
-///
-/// Underlying services will return [`Capability`] to indicate the
-/// features that returning listers support.
-///
-/// - If support `list_with_recursive`, return directly.
-/// - if not, wrap with [`FlatLister`].
+/// CompleteLayer keeps validation wrappers for read/write operations.
pub struct CompleteLayer;
impl<A: Access> Layer<A> for CompleteLayer {
type LayeredAccess = CompleteAccessor<A>;
fn layer(&self, inner: A) -> Self::LayeredAccess {
- let info = inner.info();
- info.update_full_capability(|mut cap| {
- if cap.list && cap.write_can_empty {
- cap.create_dir = true;
- }
- cap
- });
-
CompleteAccessor {
- info,
+ info: inner.info(),
inner: Arc::new(inner),
}
}
@@ -85,115 +53,11 @@ impl<A: Access> Debug for CompleteAccessor<A> {
}
}
-impl<A: Access> CompleteAccessor<A> {
- async fn complete_create_dir(&self, path: &str, args: OpCreateDir) ->
Result<RpCreateDir> {
- let capability = self.info.native_capability();
- if capability.create_dir {
- return self.inner().create_dir(path, args).await;
- }
-
- if capability.write_can_empty && capability.list {
- let (_, mut w) = self.inner.write(path, OpWrite::default()).await?;
- oio::Write::close(&mut w).await?;
- return Ok(RpCreateDir::default());
- }
-
- self.inner.create_dir(path, args).await
- }
-
- async fn complete_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
- let capability = self.info.native_capability();
-
- if path == "/" {
- return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
- }
-
- // Forward to inner if create_dir is supported.
- if path.ends_with('/') && capability.create_dir {
- let meta = self.inner.stat(path, args).await?.into_metadata();
-
- if meta.is_file() {
- return Err(Error::new(
- ErrorKind::NotFound,
- "stat expected a directory, but found a file",
- ));
- }
-
- return Ok(RpStat::new(meta));
- }
-
- // Otherwise, we can simulate stat dir via `list`.
- if path.ends_with('/') && capability.list_with_recursive {
- let (_, mut l) = self
- .inner
- .list(path,
OpList::default().with_recursive(true).with_limit(1))
- .await?;
-
- return if oio::List::next(&mut l).await?.is_some() {
- Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
- } else {
- Err(Error::new(
- ErrorKind::NotFound,
- "the directory is not found",
- ))
- };
- }
-
- // Forward to underlying storage directly since we don't know how to
handle stat dir.
- self.inner.stat(path, args).await
- }
-
- async fn complete_list(
- &self,
- path: &str,
- args: OpList,
- ) -> Result<(RpList, CompleteLister<A, A::Lister>)> {
- let cap = self.info.native_capability();
-
- let recursive = args.recursive();
-
- match (recursive, cap.list_with_recursive) {
- // - If service can list_with_recursive, we can forward list to it
directly.
- (_, true) => {
- let (rp, p) = self.inner.list(path, args).await?;
- Ok((rp, CompleteLister::One(p)))
- }
- // If recursive is true but service can't list_with_recursive
- (true, false) => {
- // Forward path that ends with /
- if path.ends_with('/') {
- let p = FlatLister::new(self.inner.clone(), path);
- Ok((RpList::default(), CompleteLister::Two(p)))
- } else {
- let parent = get_parent(path);
- let p = FlatLister::new(self.inner.clone(), parent);
- let p = PrefixLister::new(p, path);
- Ok((RpList::default(), CompleteLister::Four(p)))
- }
- }
- // If recursive and service doesn't support list_with_recursive,
we need to handle
- // list prefix by ourselves.
- (false, false) => {
- // Forward path that ends with /
- if path.ends_with('/') {
- let (rp, p) = self.inner.list(path, args).await?;
- Ok((rp, CompleteLister::One(p)))
- } else {
- let parent = get_parent(path);
- let (rp, p) = self.inner.list(parent, args).await?;
- let p = PrefixLister::new(p, path);
- Ok((rp, CompleteLister::Three(p)))
- }
- }
- }
- }
-}
-
impl<A: Access> LayeredAccess for CompleteAccessor<A> {
type Inner = A;
type Reader = CompleteReader<A::Reader>;
type Writer = CompleteWriter<A::Writer>;
- type Lister = CompleteLister<A, A::Lister>;
+ type Lister = A::Lister;
type Deleter = A::Deleter;
fn inner(&self) -> &Self::Inner {
@@ -205,7 +69,7 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
}
async fn create_dir(&self, path: &str, args: OpCreateDir) ->
Result<RpCreateDir> {
- self.complete_create_dir(path, args).await
+ self.inner().create_dir(path, args).await
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
@@ -223,7 +87,7 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
}
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
- self.complete_stat(path, args).await
+ self.inner.stat(path, args).await
}
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
@@ -231,7 +95,7 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Lister)> {
- self.complete_list(path, args).await
+ self.inner.list(path, args).await
}
async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
@@ -239,9 +103,6 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
}
}
-pub type CompleteLister<A, P> =
- FourWays<P, FlatLister<Arc<A>, P>, PrefixLister<P>,
PrefixLister<FlatLister<Arc<A>, P>>>;
-
pub struct CompleteReader<R> {
inner: R,
size: Option<u64>,
diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs
index 6c789be48..cffc12ead 100644
--- a/core/src/layers/mod.rs
+++ b/core/src/layers/mod.rs
@@ -26,6 +26,9 @@ pub(crate) use error_context::ErrorContextLayer;
mod complete;
pub(crate) use complete::CompleteLayer;
+mod simulate;
+pub use simulate::SimulateLayer;
+
mod concurrent_limit;
pub use concurrent_limit::ConcurrentLimitLayer;
diff --git a/core/src/layers/simulate.rs b/core/src/layers/simulate.rs
new file mode 100644
index 000000000..ee340fc5c
--- /dev/null
+++ b/core/src/layers/simulate.rs
@@ -0,0 +1,256 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::sync::Arc;
+
+use crate::raw::oio::FlatLister;
+use crate::raw::oio::PrefixLister;
+use crate::raw::*;
+use crate::*;
+
+/// Simulate missing capabilities for backends in a configurable way.
+#[derive(Debug, Clone)]
+pub struct SimulateLayer {
+ list_recursive: bool,
+ stat_dir: bool,
+ create_dir: bool,
+}
+
+impl Default for SimulateLayer {
+ fn default() -> Self {
+ Self {
+ list_recursive: true,
+ stat_dir: true,
+ create_dir: true,
+ }
+ }
+}
+
+impl SimulateLayer {
+ /// Enable or disable recursive list simulation. Default: true.
+ pub fn with_list_recursive(mut self, enabled: bool) -> Self {
+ self.list_recursive = enabled;
+ self
+ }
+
+ /// Enable or disable stat dir simulation. Default: true.
+ pub fn with_stat_dir(mut self, enabled: bool) -> Self {
+ self.stat_dir = enabled;
+ self
+ }
+
+ /// Enable or disable create_dir simulation. Default: true.
+ pub fn with_create_dir(mut self, enabled: bool) -> Self {
+ self.create_dir = enabled;
+ self
+ }
+}
+
+impl<A: Access> Layer<A> for SimulateLayer {
+ type LayeredAccess = SimulateAccessor<A>;
+
+ fn layer(&self, inner: A) -> Self::LayeredAccess {
+ let info = inner.info();
+ info.update_full_capability(|mut cap| {
+ if self.create_dir && cap.list && cap.write_can_empty {
+ cap.create_dir = true;
+ }
+ cap
+ });
+
+ SimulateAccessor {
+ config: self.clone(),
+ info,
+ inner: Arc::new(inner),
+ }
+ }
+}
+
+/// Accessor that applies capability simulation.
+pub struct SimulateAccessor<A: Access> {
+ config: SimulateLayer,
+ info: Arc<AccessorInfo>,
+ inner: Arc<A>,
+}
+
+impl<A: Access> Debug for SimulateAccessor<A> {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ self.inner.fmt(f)
+ }
+}
+
+impl<A: Access> SimulateAccessor<A> {
+ async fn simulate_create_dir(&self, path: &str, args: OpCreateDir) ->
Result<RpCreateDir> {
+ let capability = self.info.native_capability();
+
+ if capability.create_dir || !self.config.create_dir {
+ return self.inner().create_dir(path, args).await;
+ }
+
+ if capability.write_can_empty && capability.list {
+ let (_, mut w) = self.inner.write(path, OpWrite::default()).await?;
+ oio::Write::close(&mut w).await?;
+ return Ok(RpCreateDir::default());
+ }
+
+ self.inner.create_dir(path, args).await
+ }
+
+ async fn simulate_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
+ let capability = self.info.native_capability();
+
+ if path == "/" {
+ return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
+ }
+
+ if path.ends_with('/') {
+ if capability.create_dir {
+ let meta = self.inner.stat(path,
args.clone()).await?.into_metadata();
+
+ if meta.is_file() {
+ return Err(Error::new(
+ ErrorKind::NotFound,
+ "stat expected a directory, but found a file",
+ ));
+ }
+
+ return Ok(RpStat::new(meta));
+ }
+
+ if self.config.stat_dir && capability.list_with_recursive {
+ let (_, mut l) = self
+ .inner
+ .list(path,
OpList::default().with_recursive(true).with_limit(1))
+ .await?;
+
+ return if oio::List::next(&mut l).await?.is_some() {
+ Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
+ } else {
+ Err(Error::new(
+ ErrorKind::NotFound,
+ "the directory is not found",
+ ))
+ };
+ }
+ }
+
+ self.inner.stat(path, args).await
+ }
+
+ async fn simulate_list(
+ &self,
+ path: &str,
+ args: OpList,
+ ) -> Result<(RpList, SimulateLister<A, A::Lister>)> {
+ let cap = self.info.native_capability();
+
+ let recursive = args.recursive();
+ let forward = args;
+
+ let (rp, lister) = match (
+ recursive,
+ cap.list_with_recursive,
+ self.config.list_recursive,
+ ) {
+ // Backend supports recursive list, forward directly.
+ (_, true, _) => {
+ let (rp, p) = self.inner.list(path, forward).await?;
+ (rp, SimulateLister::One(p))
+ }
+ // Simulate recursive via flat list when enabled.
+ (true, false, true) => {
+ if path.ends_with('/') {
+ let p = FlatLister::new(self.inner.clone(), path);
+ (RpList::default(), SimulateLister::Two(p))
+ } else {
+ let parent = get_parent(path);
+ let p = FlatLister::new(self.inner.clone(), parent);
+ let p = PrefixLister::new(p, path);
+ (RpList::default(), SimulateLister::Four(p))
+ }
+ }
+ // Recursive requested but simulation disabled; rely on backend
and propagate errors.
+ (true, false, false) => {
+ let (rp, p) = self.inner.list(path, forward).await?;
+ (rp, SimulateLister::One(p))
+ }
+ // Non-recursive list: keep existing prefix handling semantics.
+ (false, false, _) => {
+ if path.ends_with('/') {
+ let (rp, p) = self.inner.list(path, forward).await?;
+ (rp, SimulateLister::One(p))
+ } else {
+ let parent = get_parent(path);
+ let (rp, p) = self.inner.list(parent, forward).await?;
+ let p = PrefixLister::new(p, path);
+ (rp, SimulateLister::Three(p))
+ }
+ }
+ };
+
+ Ok((rp, lister))
+ }
+}
+
+impl<A: Access> LayeredAccess for SimulateAccessor<A> {
+ type Inner = A;
+ type Reader = A::Reader;
+ type Writer = A::Writer;
+ type Lister = SimulateLister<A, A::Lister>;
+ type Deleter = A::Deleter;
+
+ fn inner(&self) -> &Self::Inner {
+ &self.inner
+ }
+
+ fn info(&self) -> Arc<AccessorInfo> {
+ self.info.clone()
+ }
+
+ async fn create_dir(&self, path: &str, args: OpCreateDir) ->
Result<RpCreateDir> {
+ self.simulate_create_dir(path, args).await
+ }
+
+ async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
+ self.inner.read(path, args).await
+ }
+
+ async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ self.inner.write(path, args).await
+ }
+
+ async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
+ self.simulate_stat(path, args).await
+ }
+
+ async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
+ self.inner().delete().await
+ }
+
+ async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Lister)> {
+ self.simulate_list(path, args).await
+ }
+
+ async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
+ self.inner.presign(path, args).await
+ }
+}
+
+pub type SimulateLister<A, P> =
+ FourWays<P, FlatLister<Arc<A>, P>, PrefixLister<P>,
PrefixLister<FlatLister<Arc<A>, P>>>;
diff --git a/core/src/types/operator/builder.rs
b/core/src/types/operator/builder.rs
index 1954ea85d..294ec162c 100644
--- a/core/src/types/operator/builder.rs
+++ b/core/src/types/operator/builder.rs
@@ -423,6 +423,7 @@ impl<A: Access> OperatorBuilder<A> {
// Make sure error context layer has been attached.
OperatorBuilder { accessor }
.layer(ErrorContextLayer)
+ .layer(SimulateLayer::default())
.layer(CompleteLayer)
.layer(CorrectnessCheckLayer)
}