This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new bb145e66d feat: Add from_uri support for more object storage services 
(#6665)
bb145e66d is described below

commit bb145e66d2040ec201d1aaf0d4a1e2a30720c7d9
Author: Xuanwo <[email protected]>
AuthorDate: Tue Oct 14 17:39:27 2025 +0900

    feat: Add from_uri support for more object storage services (#6665)
    
    * feat: Add from_uri support for all object storage services
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Introduce operator uri
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix tests
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix fs
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Make clippy happy
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Format code
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/blocking/operator.rs                |   8 +-
 core/src/docs/rfcs/5444_operator_from_uri.md |  32 ++--
 core/src/services/azblob/backend.rs          |  50 ++++-
 core/src/services/azblob/mod.rs              |   2 +-
 core/src/services/b2/backend.rs              |  40 +++-
 core/src/services/b2/mod.rs                  |   2 +-
 core/src/services/cos/backend.rs             |  38 +++-
 core/src/services/cos/mod.rs                 |   2 +-
 core/src/services/fs/backend.rs              |  52 +++---
 core/src/services/gcs/backend.rs             |  38 +++-
 core/src/services/gcs/mod.rs                 |   2 +-
 core/src/services/memory/backend.rs          |  27 ++-
 core/src/services/obs/backend.rs             |  38 +++-
 core/src/services/obs/mod.rs                 |   2 +-
 core/src/services/oss/backend.rs             |  38 +++-
 core/src/services/oss/mod.rs                 |   2 +-
 core/src/services/s3/backend.rs              |  41 +++--
 core/src/services/upyun/backend.rs           |  38 +++-
 core/src/services/upyun/mod.rs               |   2 +-
 core/src/types/builder.rs                    |   7 +-
 core/src/types/mod.rs                        |   2 +
 core/src/types/operator/builder.rs           |  10 +-
 core/src/types/operator/mod.rs               |   3 +
 core/src/types/operator/registry.rs          |  70 +++----
 core/src/types/operator/uri.rs               | 264 +++++++++++++++++++++++++++
 25 files changed, 660 insertions(+), 150 deletions(-)

diff --git a/core/src/blocking/operator.rs b/core/src/blocking/operator.rs
index 0ae55953f..037b65a19 100644
--- a/core/src/blocking/operator.rs
+++ b/core/src/blocking/operator.rs
@@ -18,6 +18,7 @@
 use tokio::runtime::Handle;
 
 use crate::Operator as AsyncOperator;
+use crate::types::IntoOperatorUri;
 use crate::*;
 
 /// Use OpenDAL in blocking context.
@@ -136,11 +137,8 @@ impl Operator {
     }
 
     /// Create a blocking operator from URI based configuration.
-    pub fn from_uri(
-        uri: &str,
-        options: impl IntoIterator<Item = (String, String)>,
-    ) -> Result<Self> {
-        let op = AsyncOperator::from_uri(uri, options)?;
+    pub fn from_uri(uri: impl IntoOperatorUri) -> Result<Self> {
+        let op = AsyncOperator::from_uri(uri)?;
         Self::new(op)
     }
 
diff --git a/core/src/docs/rfcs/5444_operator_from_uri.md 
b/core/src/docs/rfcs/5444_operator_from_uri.md
index 5b5f21c2d..5bb203d5b 100644
--- a/core/src/docs/rfcs/5444_operator_from_uri.md
+++ b/core/src/docs/rfcs/5444_operator_from_uri.md
@@ -24,18 +24,17 @@ The new API allows creating operators directly from URIs:
 
 ```rust
 // Create an operator using URI
-let op = Operator::from_uri("s3://my-bucket/path", vec![
-    ("endpoint".to_string(), "http://localhost:8080"to_string()),
-])?;
+let op = Operator::from_uri("s3://my-bucket/path")?;
 
 // Users can pass options through the URI along with additional key-value pairs
 // The extra options will override identical options specified in the URI
-let op = Operator::from_uri("s3://my-bucket/path?region=us-east-1", vec![
-    ("endpoint".to_string(), "http://localhost:8080"to_string()),
-])?;
+let op = Operator::from_uri((
+    "s3://my-bucket/path?region=us-east-1",
+    [("endpoint", "http://localhost:8080";)],
+))?;
 
 // Create a file system operator
-let op = Operator::from_uri("fs:///tmp/test", vec![])?;
+let op = Operator::from_uri("fs:///tmp/test")?;
 ```
 
 OpenDAL will, by default, register services enabled by features in a global 
`OperatorRegistry`. Users can also create custom operator registries to support 
their own schemes or additional options.
@@ -53,7 +52,7 @@ registry.register::<services::S3>("r2");     // Cloudflare R2 
is S3-compatible
 registry.register::<services::S3>("company-storage");
 registry.register::<services::Azblob>("backup-storage");
 
-let op = registry.load("company-storage://bucket/path", [])?;
+let op = registry.load("company-storage://bucket/path")?;
 ```
 
 # Reference-level explanation
@@ -62,12 +61,10 @@ The implementation consists of three main components:
 
 1. The `OperatorFactory` and `OperatorRegistry`:
 
-`OperatorFactory` is a function type that takes a URI string plus options and 
returns an `Operator`. `OperatorRegistry` manages factories registered under 
different schemes.
-
-`OperatorFactory` is a function type that takes a URI string plus options and 
returns an `Operator`. `OperatorRegistry` manages factories registered under 
different schemes.
+`OperatorFactory` is a function type that takes a parsed `OperatorUri` and 
returns an `Operator`. `OperatorRegistry` manages factories registered under 
different schemes.
 
 ```rust
-type OperatorFactory = fn(&str, Vec<(String, String)>) -> Result<Operator>;
+type OperatorFactory = fn(&OperatorUri) -> Result<Operator>;
 
 pub struct OperatorRegistry { ... }
 
@@ -76,7 +73,7 @@ impl OperatorRegistry {
         ...
     }
 
-    fn load(&self, uri: &str, options: impl IntoIterator<Item = (String, 
String)>) -> Result<Operator> {
+    fn load(&self, uri: impl IntoOperatorUri) -> Result<Operator> {
         ...
     }
 }
@@ -84,11 +81,11 @@ impl OperatorRegistry {
 
 2. The `Configurator` trait extension:
 
-`Configurator` will add a new API to create a configuration from a URI and 
options. Services should only parse the URI components relevant to their 
configuration (host, path, query parameters) without concerning themselves with 
the scheme portion.
+`Configurator` will add a new API to create a configuration from a parsed 
`OperatorUri`. Services should only inspect the URI components relevant to 
their configuration (name, root, options) without concerning themselves with 
the scheme portion.
 
 ```rust
 impl Configurator for S3Config {
-    fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
+    fn from_uri(uri: &OperatorUri) -> Result<Self> {
         ...
     }
 }
@@ -102,10 +99,7 @@ The `Operator` trait will add a new `from_uri` method to 
create an operator from
 
 ```rust
 impl Operator {
-    pub fn from_uri(
-        uri: &str,
-        options: impl IntoIterator<Item = (String, String)>,
-    ) -> Result<Self> {
+    pub fn from_uri(uri: impl IntoOperatorUri) -> Result<Self> {
         ...
     }
 }
diff --git a/core/src/services/azblob/backend.rs 
b/core/src/services/azblob/backend.rs
index db34c83d8..df81c11a7 100644
--- a/core/src/services/azblob/backend.rs
+++ b/core/src/services/azblob/backend.rs
@@ -30,7 +30,7 @@ use reqsign::AzureStorageSigner;
 use sha2::Digest;
 use sha2::Sha256;
 
-use super::DEFAULT_SCHEME;
+use super::AZBLOB_SCHEME;
 use super::core::AzblobCore;
 use super::core::constants::X_MS_META_PREFIX;
 use super::core::constants::X_MS_VERSION_ID;
@@ -41,6 +41,7 @@ use super::writer::AzblobWriter;
 use super::writer::AzblobWriters;
 use crate::raw::*;
 use crate::services::AzblobConfig;
+use crate::types::OperatorUri;
 use crate::*;
 const AZBLOB_BATCH_LIMIT: usize = 256;
 
@@ -59,6 +60,20 @@ impl From<AzureStorageConfig> for AzblobConfig {
 impl Configurator for AzblobConfig {
     type Builder = AzblobBuilder;
 
+    fn from_uri(uri: &OperatorUri) -> Result<Self> {
+        let mut map = uri.options().clone();
+
+        if let Some(container) = uri.name() {
+            map.insert("container".to_string(), container.to_string());
+        }
+
+        if let Some(root) = uri.root() {
+            map.insert("root".to_string(), root.to_string());
+        }
+
+        Self::from_iter(map)
+    }
+
     #[allow(deprecated)]
     fn into_builder(self) -> Self::Builder {
         AzblobBuilder {
@@ -397,7 +412,7 @@ impl Builder for AzblobBuilder {
             core: Arc::new(AzblobCore {
                 info: {
                     let am = AccessorInfo::default();
-                    am.set_scheme(DEFAULT_SCHEME)
+                    am.set_scheme(AZBLOB_SCHEME)
                         .set_root(&root)
                         .set_name(container)
                         .set_native_capability(Capability {
@@ -592,3 +607,34 @@ impl Access for AzblobBackend {
         )))
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::Configurator;
+    use crate::types::OperatorUri;
+
+    #[test]
+    fn from_uri_with_host_container() {
+        let uri = OperatorUri::new(
+            "azblob://my-container/path/to/root".parse().unwrap(),
+            Vec::<(String, String)>::new(),
+        )
+        .unwrap();
+        let cfg = AzblobConfig::from_uri(&uri).unwrap();
+        assert_eq!(cfg.container, "my-container");
+        assert_eq!(cfg.root.as_deref(), Some("path/to/root"));
+    }
+
+    #[test]
+    fn from_uri_with_path_container() {
+        let uri = OperatorUri::new(
+            "azblob://my-container/nested/root".parse().unwrap(),
+            Vec::<(String, String)>::new(),
+        )
+        .unwrap();
+        let cfg = AzblobConfig::from_uri(&uri).unwrap();
+        assert_eq!(cfg.container, "my-container");
+        assert_eq!(cfg.root.as_deref(), Some("nested/root"));
+    }
+}
diff --git a/core/src/services/azblob/mod.rs b/core/src/services/azblob/mod.rs
index 818ed4628..f014a29ef 100644
--- a/core/src/services/azblob/mod.rs
+++ b/core/src/services/azblob/mod.rs
@@ -17,7 +17,7 @@
 
 /// Default scheme for azblob service.
 #[cfg(feature = "services-azblob")]
-pub(super) const DEFAULT_SCHEME: &str = "azblob";
+pub const AZBLOB_SCHEME: &str = "azblob";
 #[cfg(feature = "services-azblob")]
 pub(crate) mod core;
 #[cfg(feature = "services-azblob")]
diff --git a/core/src/services/b2/backend.rs b/core/src/services/b2/backend.rs
index 39148a76f..d545fce5a 100644
--- a/core/src/services/b2/backend.rs
+++ b/core/src/services/b2/backend.rs
@@ -25,7 +25,7 @@ use http::StatusCode;
 use log::debug;
 use tokio::sync::RwLock;
 
-use super::DEFAULT_SCHEME;
+use super::B2_SCHEME;
 use super::core::B2Core;
 use super::core::B2Signer;
 use super::core::constants;
@@ -37,10 +37,25 @@ use super::writer::B2Writer;
 use super::writer::B2Writers;
 use crate::raw::*;
 use crate::services::B2Config;
+use crate::types::OperatorUri;
 use crate::*;
 impl Configurator for B2Config {
     type Builder = B2Builder;
 
+    fn from_uri(uri: &OperatorUri) -> Result<Self> {
+        let mut map = uri.options().clone();
+
+        if let Some(name) = uri.name() {
+            map.insert("bucket".to_string(), name.to_string());
+        }
+
+        if let Some(root) = uri.root() {
+            map.insert("root".to_string(), root.to_string());
+        }
+
+        Self::from_iter(map)
+    }
+
     #[allow(deprecated)]
     fn into_builder(self) -> Self::Builder {
         B2Builder {
@@ -191,7 +206,7 @@ impl Builder for B2Builder {
             core: Arc::new(B2Core {
                 info: {
                     let am = AccessorInfo::default();
-                    am.set_scheme(DEFAULT_SCHEME)
+                    am.set_scheme(B2_SCHEME)
                         .set_root(&root)
                         .set_native_capability(Capability {
                             stat: true,
@@ -432,3 +447,24 @@ impl Access for B2Backend {
         }
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::Configurator;
+    use crate::types::OperatorUri;
+
+    #[test]
+    fn from_uri_extracts_bucket_and_root() {
+        let uri = OperatorUri::new(
+            "b2://example-bucket/path/to/root".parse().unwrap(),
+            vec![("bucket_id".to_string(), "bucket-id".to_string())],
+        )
+        .unwrap();
+
+        let cfg = B2Config::from_uri(&uri).unwrap();
+        assert_eq!(cfg.bucket, "example-bucket");
+        assert_eq!(cfg.root.as_deref(), Some("path/to/root"));
+        assert_eq!(cfg.bucket_id, "bucket-id");
+    }
+}
diff --git a/core/src/services/b2/mod.rs b/core/src/services/b2/mod.rs
index 09c6b4bfe..6a4778994 100644
--- a/core/src/services/b2/mod.rs
+++ b/core/src/services/b2/mod.rs
@@ -17,7 +17,7 @@
 
 /// Default scheme for b2 service.
 #[cfg(feature = "services-b2")]
-pub(super) const DEFAULT_SCHEME: &str = "b2";
+pub const B2_SCHEME: &str = "b2";
 #[cfg(feature = "services-b2")]
 mod core;
 #[cfg(feature = "services-b2")]
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index 93969e28c..780e1aa78 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -26,7 +26,7 @@ use reqsign::TencentCosConfig;
 use reqsign::TencentCosCredentialLoader;
 use reqsign::TencentCosSigner;
 
-use super::DEFAULT_SCHEME;
+use super::COS_SCHEME;
 use super::core::*;
 use super::delete::CosDeleter;
 use super::error::parse_error;
@@ -38,10 +38,25 @@ use super::writer::CosWriters;
 use crate::raw::oio::PageLister;
 use crate::raw::*;
 use crate::services::CosConfig;
+use crate::types::OperatorUri;
 use crate::*;
 impl Configurator for CosConfig {
     type Builder = CosBuilder;
 
+    fn from_uri(uri: &OperatorUri) -> Result<Self> {
+        let mut map = uri.options().clone();
+
+        if let Some(name) = uri.name() {
+            map.insert("bucket".to_string(), name.to_string());
+        }
+
+        if let Some(root) = uri.root() {
+            map.insert("root".to_string(), root.to_string());
+        }
+
+        Self::from_iter(map)
+    }
+
     #[allow(deprecated)]
     fn into_builder(self) -> Self::Builder {
         CosBuilder {
@@ -221,7 +236,7 @@ impl Builder for CosBuilder {
             core: Arc::new(CosCore {
                 info: {
                     let am = AccessorInfo::default();
-                    am.set_scheme(DEFAULT_SCHEME)
+                    am.set_scheme(COS_SCHEME)
                         .set_root(&root)
                         .set_name(&bucket)
                         .set_native_capability(Capability {
@@ -440,3 +455,22 @@ impl Access for CosBackend {
         )))
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::Configurator;
+    use crate::types::OperatorUri;
+
+    #[test]
+    fn from_uri_extracts_bucket_and_root() {
+        let uri = OperatorUri::new(
+            "cos://example-bucket/path/to/root".parse().unwrap(),
+            Vec::<(String, String)>::new(),
+        )
+        .unwrap();
+        let cfg = CosConfig::from_uri(&uri).unwrap();
+        assert_eq!(cfg.bucket.as_deref(), Some("example-bucket"));
+        assert_eq!(cfg.root.as_deref(), Some("path/to/root"));
+    }
+}
diff --git a/core/src/services/cos/mod.rs b/core/src/services/cos/mod.rs
index 486597aa2..0de2ff00d 100644
--- a/core/src/services/cos/mod.rs
+++ b/core/src/services/cos/mod.rs
@@ -17,7 +17,7 @@
 
 /// Default scheme for cos service.
 #[cfg(feature = "services-cos")]
-pub(super) const DEFAULT_SCHEME: &str = "cos";
+pub const COS_SCHEME: &str = "cos";
 #[cfg(feature = "services-cos")]
 mod core;
 #[cfg(feature = "services-cos")]
diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs
index a42b551c0..9e67c530f 100644
--- a/core/src/services/fs/backend.rs
+++ b/core/src/services/fs/backend.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::HashMap;
 use std::path::PathBuf;
 use std::sync::Arc;
 
@@ -30,30 +29,22 @@ use super::writer::FsWriter;
 use super::writer::FsWriters;
 use crate::raw::*;
 use crate::services::FsConfig;
+use crate::types::OperatorUri;
 use crate::*;
-use http::Uri;
-use percent_encoding::percent_decode_str;
 impl Configurator for FsConfig {
     type Builder = FsBuilder;
 
-    fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
-        let mut map = options.clone();
-
-        if !map.contains_key("root") {
-            let path = percent_decode_str(uri.path()).decode_utf8_lossy();
-            if path.is_empty() || path == "/" {
-                return Err(Error::new(
-                    ErrorKind::ConfigInvalid,
-                    "fs uri requires absolute path",
-                ));
-            }
-            if !path.starts_with('/') {
-                return Err(Error::new(
-                    ErrorKind::ConfigInvalid,
-                    "fs uri root must be absolute",
-                ));
-            }
-            map.insert("root".to_string(), path.to_string());
+    fn from_uri(uri: &OperatorUri) -> Result<Self> {
+        let mut map = uri.options().clone();
+
+        if let Some(value) = match (uri.name(), uri.root()) {
+            (Some(name), Some(rest)) if !rest.is_empty() => 
Some(format!("/{}/{}", name, rest)),
+            (Some(name), _) => Some(format!("/{}", name)),
+            (None, Some(rest)) if !rest.is_empty() => Some(format!("/{}", 
rest)),
+            (None, Some(rest)) => Some(rest.to_string()),
+            _ => None,
+        } {
+            map.insert("root".to_string(), value);
         }
 
         Self::from_iter(map)
@@ -298,3 +289,22 @@ impl Access for FsBackend {
         Ok(RpRename::default())
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::Configurator;
+    use crate::types::OperatorUri;
+    use http::Uri;
+
+    #[test]
+    fn from_uri_extracts_root() {
+        let uri = OperatorUri::new(
+            Uri::from_static("fs://tmp/data"),
+            Vec::<(String, String)>::new(),
+        )
+        .unwrap();
+        let cfg = FsConfig::from_uri(&uri).unwrap();
+        assert_eq!(cfg.root.as_deref(), Some("/tmp/data"));
+    }
+}
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index c99150ab3..6ba521f16 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -27,7 +27,7 @@ use reqsign::GoogleSigner;
 use reqsign::GoogleTokenLoad;
 use reqsign::GoogleTokenLoader;
 
-use super::DEFAULT_SCHEME;
+use super::GCS_SCHEME;
 use super::core::*;
 use super::delete::GcsDeleter;
 use super::error::parse_error;
@@ -37,6 +37,7 @@ use super::writer::GcsWriters;
 use crate::raw::oio::BatchDeleter;
 use crate::raw::*;
 use crate::services::GcsConfig;
+use crate::types::OperatorUri;
 use crate::*;
 const DEFAULT_GCS_ENDPOINT: &str = "https://storage.googleapis.com";;
 const DEFAULT_GCS_SCOPE: &str = 
"https://www.googleapis.com/auth/devstorage.read_write";;
@@ -44,6 +45,20 @@ const DEFAULT_GCS_SCOPE: &str = 
"https://www.googleapis.com/auth/devstorage.read
 impl Configurator for GcsConfig {
     type Builder = GcsBuilder;
 
+    fn from_uri(uri: &OperatorUri) -> Result<Self> {
+        let mut map = uri.options().clone();
+
+        if let Some(name) = uri.name() {
+            map.insert("bucket".to_string(), name.to_string());
+        }
+
+        if let Some(root) = uri.root() {
+            map.insert("root".to_string(), root.to_string());
+        }
+
+        Self::from_iter(map)
+    }
+
     #[allow(deprecated)]
     fn into_builder(self) -> Self::Builder {
         GcsBuilder {
@@ -307,7 +322,7 @@ impl Builder for GcsBuilder {
             core: Arc::new(GcsCore {
                 info: {
                     let am = AccessorInfo::default();
-                    am.set_scheme(DEFAULT_SCHEME)
+                    am.set_scheme(GCS_SCHEME)
                         .set_root(&root)
                         .set_name(bucket)
                         .set_native_capability(Capability {
@@ -497,3 +512,22 @@ impl Access for GcsBackend {
         )))
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::Configurator;
+    use crate::types::OperatorUri;
+
+    #[test]
+    fn from_uri_extracts_bucket_and_root() {
+        let uri = OperatorUri::new(
+            "gcs://example-bucket/path/to/root".parse().unwrap(),
+            Vec::<(String, String)>::new(),
+        )
+        .unwrap();
+        let cfg = GcsConfig::from_uri(&uri).unwrap();
+        assert_eq!(cfg.bucket, "example-bucket");
+        assert_eq!(cfg.root.as_deref(), Some("path/to/root"));
+    }
+}
diff --git a/core/src/services/gcs/mod.rs b/core/src/services/gcs/mod.rs
index a4d5d9712..0c8427158 100644
--- a/core/src/services/gcs/mod.rs
+++ b/core/src/services/gcs/mod.rs
@@ -17,7 +17,7 @@
 
 /// Default scheme for gcs service.
 #[cfg(feature = "services-gcs")]
-pub(super) const DEFAULT_SCHEME: &str = "gcs";
+pub const GCS_SCHEME: &str = "gcs";
 #[cfg(feature = "services-gcs")]
 mod core;
 #[cfg(feature = "services-gcs")]
diff --git a/core/src/services/memory/backend.rs 
b/core/src/services/memory/backend.rs
index 9def58cce..1e65f5419 100644
--- a/core/src/services/memory/backend.rs
+++ b/core/src/services/memory/backend.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::HashMap;
 use std::fmt::Debug;
 use std::sync::Arc;
 
@@ -27,19 +26,16 @@ use super::writer::MemoryWriter;
 use crate::raw::oio;
 use crate::raw::*;
 use crate::services::MemoryConfig;
+use crate::types::OperatorUri;
 use crate::*;
-use http::Uri;
-use percent_encoding::percent_decode_str;
 impl Configurator for MemoryConfig {
     type Builder = MemoryBuilder;
 
-    fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
-        let mut map = options.clone();
-
+    fn from_uri(uri: &OperatorUri) -> Result<Self> {
+        let mut map = uri.options().clone();
         if !map.contains_key("root") {
-            let path = percent_decode_str(uri.path()).decode_utf8_lossy();
-            if !path.is_empty() && path != "/" {
-                map.insert("root".to_string(), 
path.trim_start_matches('/').to_string());
+            if let Some(root) = uri.root().filter(|v| !v.is_empty()) {
+                map.insert("root".to_string(), root.to_string());
             }
         }
 
@@ -194,6 +190,8 @@ impl Access for MemoryAccessor {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::Configurator;
+    use crate::types::OperatorUri;
 
     #[test]
     fn test_accessor_metadata_name() {
@@ -203,4 +201,15 @@ mod tests {
         let b2 = MemoryBuilder::default().build().unwrap();
         assert_ne!(b1.info().name(), b2.info().name())
     }
+
+    #[test]
+    fn from_uri_extracts_root() {
+        let uri = OperatorUri::new(
+            "memory://localhost/path/to/root".parse().unwrap(),
+            Vec::<(String, String)>::new(),
+        )
+        .unwrap();
+        let cfg = MemoryConfig::from_uri(&uri).unwrap();
+        assert_eq!(cfg.root.as_deref(), Some("path/to/root"));
+    }
 }
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index bac66cc60..71d49ceaa 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -28,7 +28,7 @@ use reqsign::HuaweicloudObsConfig;
 use reqsign::HuaweicloudObsCredentialLoader;
 use reqsign::HuaweicloudObsSigner;
 
-use super::DEFAULT_SCHEME;
+use super::OBS_SCHEME;
 use super::core::ObsCore;
 use super::core::constants;
 use super::delete::ObsDeleter;
@@ -38,10 +38,25 @@ use super::writer::ObsWriter;
 use super::writer::ObsWriters;
 use crate::raw::*;
 use crate::services::ObsConfig;
+use crate::types::OperatorUri;
 use crate::*;
 impl Configurator for ObsConfig {
     type Builder = ObsBuilder;
 
+    fn from_uri(uri: &OperatorUri) -> Result<Self> {
+        let mut map = uri.options().clone();
+
+        if let Some(name) = uri.name() {
+            map.insert("bucket".to_string(), name.to_string());
+        }
+
+        if let Some(root) = uri.root() {
+            map.insert("root".to_string(), root.to_string());
+        }
+
+        Self::from_iter(map)
+    }
+
     #[allow(deprecated)]
     fn into_builder(self) -> Self::Builder {
         ObsBuilder {
@@ -226,7 +241,7 @@ impl Builder for ObsBuilder {
             core: Arc::new(ObsCore {
                 info: {
                     let am = AccessorInfo::default();
-                    am.set_scheme(DEFAULT_SCHEME)
+                    am.set_scheme(OBS_SCHEME)
                         .set_root(&root)
                         .set_name(&bucket)
                         .set_native_capability(Capability {
@@ -434,3 +449,22 @@ impl Access for ObsBackend {
         )))
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::Configurator;
+    use crate::types::OperatorUri;
+
+    #[test]
+    fn from_uri_extracts_bucket_and_root() {
+        let uri = OperatorUri::new(
+            "obs://example-bucket/path/to/root".parse().unwrap(),
+            Vec::<(String, String)>::new(),
+        )
+        .unwrap();
+        let cfg = ObsConfig::from_uri(&uri).unwrap();
+        assert_eq!(cfg.bucket.as_deref(), Some("example-bucket"));
+        assert_eq!(cfg.root.as_deref(), Some("path/to/root"));
+    }
+}
diff --git a/core/src/services/obs/mod.rs b/core/src/services/obs/mod.rs
index ed94697a1..c33719f6a 100644
--- a/core/src/services/obs/mod.rs
+++ b/core/src/services/obs/mod.rs
@@ -17,7 +17,7 @@
 
 /// Default scheme for obs service.
 #[cfg(feature = "services-obs")]
-pub(super) const DEFAULT_SCHEME: &str = "obs";
+pub const OBS_SCHEME: &str = "obs";
 #[cfg(feature = "services-obs")]
 mod core;
 #[cfg(feature = "services-obs")]
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 4d0d8cb62..0b2856c43 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -27,7 +27,7 @@ use reqsign::AliyunConfig;
 use reqsign::AliyunLoader;
 use reqsign::AliyunOssSigner;
 
-use super::DEFAULT_SCHEME;
+use super::OSS_SCHEME;
 use super::core::*;
 use super::delete::OssDeleter;
 use super::error::parse_error;
@@ -38,12 +38,27 @@ use super::writer::OssWriter;
 use super::writer::OssWriters;
 use crate::raw::*;
 use crate::services::OssConfig;
+use crate::types::OperatorUri;
 use crate::*;
 const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
 
 impl Configurator for OssConfig {
     type Builder = OssBuilder;
 
+    fn from_uri(uri: &OperatorUri) -> Result<Self> {
+        let mut map = uri.options().clone();
+
+        if let Some(name) = uri.name() {
+            map.insert("bucket".to_string(), name.to_string());
+        }
+
+        if let Some(root) = uri.root() {
+            map.insert("root".to_string(), root.to_string());
+        }
+
+        Self::from_iter(map)
+    }
+
     #[allow(deprecated)]
     fn into_builder(self) -> Self::Builder {
         OssBuilder {
@@ -511,7 +526,7 @@ impl Builder for OssBuilder {
             core: Arc::new(OssCore {
                 info: {
                     let am = AccessorInfo::default();
-                    am.set_scheme(DEFAULT_SCHEME)
+                    am.set_scheme(OSS_SCHEME)
                         .set_root(&root)
                         .set_name(bucket)
                         .set_native_capability(Capability {
@@ -732,3 +747,22 @@ impl Access for OssBackend {
         )))
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::Configurator;
+    use crate::types::OperatorUri;
+
+    #[test]
+    fn from_uri_extracts_bucket_and_root() {
+        let uri = OperatorUri::new(
+            "oss://example-bucket/path/to/root".parse().unwrap(),
+            Vec::<(String, String)>::new(),
+        )
+        .unwrap();
+        let cfg = OssConfig::from_uri(&uri).unwrap();
+        assert_eq!(cfg.bucket, "example-bucket");
+        assert_eq!(cfg.root.as_deref(), Some("path/to/root"));
+    }
+}
diff --git a/core/src/services/oss/mod.rs b/core/src/services/oss/mod.rs
index e27e6a78b..e3e05c1b6 100644
--- a/core/src/services/oss/mod.rs
+++ b/core/src/services/oss/mod.rs
@@ -17,7 +17,7 @@
 
 /// Default scheme for oss service.
 #[cfg(feature = "services-oss")]
-pub(super) const DEFAULT_SCHEME: &str = "oss";
+pub const OSS_SCHEME: &str = "oss";
 #[cfg(feature = "services-oss")]
 mod core;
 #[cfg(feature = "services-oss")]
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 4570ce6b0..309859dab 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -30,12 +30,10 @@ use constants::X_AMZ_META_PREFIX;
 use constants::X_AMZ_VERSION_ID;
 use http::Response;
 use http::StatusCode;
-use http::Uri;
 use log::debug;
 use log::warn;
 use md5::Digest;
 use md5::Md5;
-use percent_encoding::percent_decode_str;
 use reqsign::AwsAssumeRoleLoader;
 use reqsign::AwsConfig;
 use reqsign::AwsCredentialLoad;
@@ -56,6 +54,7 @@ use super::writer::S3Writers;
 use crate::raw::oio::PageLister;
 use crate::raw::*;
 use crate::services::S3Config;
+use crate::types::OperatorUri;
 use crate::*;
 
 /// Allow constructing correct region endpoint if user gives a global endpoint.
@@ -74,25 +73,15 @@ const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
 impl Configurator for S3Config {
     type Builder = S3Builder;
 
-    fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
-        let mut map = options.clone();
-
-        let bucket_missing = map.get("bucket").map(|v| 
v.is_empty()).unwrap_or(true);
-        if bucket_missing {
-            let bucket = uri
-                .authority()
-                .map(|authority| authority.host())
-                .filter(|host| !host.is_empty())
-                .ok_or_else(|| Error::new(ErrorKind::ConfigInvalid, "s3 uri 
requires bucket"))?;
-            map.insert("bucket".to_string(), bucket.to_string());
+    fn from_uri(uri: &OperatorUri) -> Result<Self> {
+        let mut map = uri.options().clone();
+
+        if let Some(name) = uri.name() {
+            map.insert("bucket".to_string(), name.to_string());
         }
 
-        if !map.contains_key("root") {
-            let path = percent_decode_str(uri.path()).decode_utf8_lossy();
-            let trimmed = path.trim_matches('/');
-            if !trimmed.is_empty() {
-                map.insert("root".to_string(), trimmed.to_string());
-            }
+        if let Some(root) = uri.root() {
+            map.insert("root".to_string(), root.to_string());
         }
 
         Self::from_iter(map)
@@ -1188,6 +1177,8 @@ impl Access for S3Backend {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::Configurator;
+    use crate::types::OperatorUri;
 
     #[test]
     fn test_is_valid_bucket() {
@@ -1290,4 +1281,16 @@ mod tests {
             assert_eq!(region.as_deref(), expected, "{name}");
         }
     }
+
+    #[test]
+    fn from_uri_extracts_bucket_and_root() {
+        let uri = OperatorUri::new(
+            "s3://example-bucket/path/to/root".parse().unwrap(),
+            Vec::<(String, String)>::new(),
+        )
+        .unwrap();
+        let cfg = S3Config::from_uri(&uri).unwrap();
+        assert_eq!(cfg.bucket, "example-bucket");
+        assert_eq!(cfg.root.as_deref(), Some("path/to/root"));
+    }
 }
diff --git a/core/src/services/upyun/backend.rs 
b/core/src/services/upyun/backend.rs
index 5f03a7241..412011e49 100644
--- a/core/src/services/upyun/backend.rs
+++ b/core/src/services/upyun/backend.rs
@@ -23,7 +23,7 @@ use http::Response;
 use http::StatusCode;
 use log::debug;
 
-use super::DEFAULT_SCHEME;
+use super::UPYUN_SCHEME;
 use super::core::*;
 use super::delete::UpyunDeleter;
 use super::error::parse_error;
@@ -32,10 +32,25 @@ use super::writer::UpyunWriter;
 use super::writer::UpyunWriters;
 use crate::raw::*;
 use crate::services::UpyunConfig;
+use crate::types::OperatorUri;
 use crate::*;
 impl Configurator for UpyunConfig {
     type Builder = UpyunBuilder;
 
+    fn from_uri(uri: &OperatorUri) -> Result<Self> {
+        let mut map = uri.options().clone();
+
+        if let Some(name) = uri.name() {
+            map.insert("bucket".to_string(), name.to_string());
+        }
+
+        if let Some(root) = uri.root() {
+            map.insert("root".to_string(), root.to_string());
+        }
+
+        Self::from_iter(map)
+    }
+
     #[allow(deprecated)]
     fn into_builder(self) -> Self::Builder {
         UpyunBuilder {
@@ -169,7 +184,7 @@ impl Builder for UpyunBuilder {
             core: Arc::new(UpyunCore {
                 info: {
                     let am = AccessorInfo::default();
-                    am.set_scheme(DEFAULT_SCHEME)
+                    am.set_scheme(UPYUN_SCHEME)
                         .set_root(&root)
                         .set_native_capability(Capability {
                             stat: true,
@@ -315,3 +330,22 @@ impl Access for UpyunBackend {
         }
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::Configurator;
+    use crate::types::OperatorUri;
+
+    #[test]
+    fn from_uri_extracts_bucket_and_root() {
+        let uri = OperatorUri::new(
+            "upyun://example-bucket/path/to/root".parse().unwrap(),
+            Vec::<(String, String)>::new(),
+        )
+        .unwrap();
+        let cfg = UpyunConfig::from_uri(&uri).unwrap();
+        assert_eq!(cfg.bucket, "example-bucket");
+        assert_eq!(cfg.root.as_deref(), Some("path/to/root"));
+    }
+}
diff --git a/core/src/services/upyun/mod.rs b/core/src/services/upyun/mod.rs
index eee374681..50db7493e 100644
--- a/core/src/services/upyun/mod.rs
+++ b/core/src/services/upyun/mod.rs
@@ -17,7 +17,7 @@
 
 /// Default scheme for upyun service.
 #[cfg(feature = "services-upyun")]
-pub(super) const DEFAULT_SCHEME: &str = "upyun";
+pub const UPYUN_SCHEME: &str = "upyun";
 #[cfg(feature = "services-upyun")]
 mod core;
 #[cfg(feature = "services-upyun")]
diff --git a/core/src/types/builder.rs b/core/src/types/builder.rs
index c91363ed7..903459775 100644
--- a/core/src/types/builder.rs
+++ b/core/src/types/builder.rs
@@ -15,14 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::HashMap;
 use std::fmt::Debug;
 
-use http::Uri;
 use serde::Serialize;
 use serde::de::DeserializeOwned;
 
 use crate::raw::*;
+use crate::types::OperatorUri;
 use crate::*;
 
 /// Builder is used to set up underlying services.
@@ -125,8 +124,8 @@ pub trait Configurator: Serialize + DeserializeOwned + 
Debug + 'static {
     /// Associated builder for this configuration.
     type Builder: Builder;
 
-    /// Build configuration from a URI plus merged options.
-    fn from_uri(_uri: &Uri, _options: &HashMap<String, String>) -> 
Result<Self> {
+    /// Build configuration from a parsed URI plus merged options.
+    fn from_uri(_uri: &OperatorUri) -> Result<Self> {
         Err(Error::new(ErrorKind::Unsupported, "uri is not supported"))
     }
 
diff --git a/core/src/types/mod.rs b/core/src/types/mod.rs
index b57155646..79f8daf36 100644
--- a/core/src/types/mod.rs
+++ b/core/src/types/mod.rs
@@ -44,11 +44,13 @@ pub use execute::*;
 
 mod operator;
 pub use operator::DEFAULT_OPERATOR_REGISTRY;
+pub use operator::IntoOperatorUri;
 pub use operator::Operator;
 pub use operator::OperatorBuilder;
 pub use operator::OperatorFactory;
 pub use operator::OperatorInfo;
 pub use operator::OperatorRegistry;
+pub use operator::OperatorUri;
 pub use operator::operator_futures;
 
 mod builder;
diff --git a/core/src/types/operator/builder.rs 
b/core/src/types/operator/builder.rs
index 22a10e274..e2628d828 100644
--- a/core/src/types/operator/builder.rs
+++ b/core/src/types/operator/builder.rs
@@ -20,6 +20,7 @@ use std::sync::Arc;
 
 use crate::layers::*;
 use crate::raw::*;
+use crate::types::IntoOperatorUri;
 use crate::*;
 
 /// # Operator build API
@@ -141,16 +142,13 @@ impl Operator {
     /// use opendal::Operator;
     ///
     /// # fn example() -> Result<()> {
-    /// let op = Operator::from_uri("memory://localhost/", [])?;
+    /// let op = Operator::from_uri("memory://localhost/")?;
     /// # let _ = op;
     /// # Ok(())
     /// # }
     /// ```
-    pub fn from_uri(
-        uri: &str,
-        options: impl IntoIterator<Item = (String, String)>,
-    ) -> Result<Operator> {
-        crate::DEFAULT_OPERATOR_REGISTRY.load(uri, options)
+    pub fn from_uri(uri: impl IntoOperatorUri) -> Result<Operator> {
+        crate::DEFAULT_OPERATOR_REGISTRY.load(uri)
     }
 
     /// Create a new operator via given scheme and iterator of config value in 
dynamic dispatch.
diff --git a/core/src/types/operator/mod.rs b/core/src/types/operator/mod.rs
index c42f6d281..3de1ea7c1 100644
--- a/core/src/types/operator/mod.rs
+++ b/core/src/types/operator/mod.rs
@@ -31,3 +31,6 @@ pub mod operator_futures;
 
 mod registry;
 pub use registry::{DEFAULT_OPERATOR_REGISTRY, OperatorFactory, 
OperatorRegistry};
+
+mod uri;
+pub use uri::{IntoOperatorUri, OperatorUri};
diff --git a/core/src/types/operator/registry.rs 
b/core/src/types/operator/registry.rs
index 7f11c003f..006c6d843 100644
--- a/core/src/types/operator/registry.rs
+++ b/core/src/types/operator/registry.rs
@@ -18,15 +18,13 @@
 use std::collections::HashMap;
 use std::sync::{LazyLock, Mutex};
 
-use http::Uri;
-use percent_encoding::percent_decode_str;
-
 use crate::services;
 use crate::types::builder::{Builder, Configurator};
+use crate::types::{IntoOperatorUri, OperatorUri};
 use crate::{Error, ErrorKind, Operator, Result};
 
 /// Factory signature used to construct [`Operator`] from a URI and extra 
options.
-pub type OperatorFactory = fn(&str, Vec<(String, String)>) -> Result<Operator>;
+pub type OperatorFactory = fn(&OperatorUri) -> Result<Operator>;
 
 /// Default registry initialized with builtin services.
 pub static DEFAULT_OPERATOR_REGISTRY: LazyLock<OperatorRegistry> = 
LazyLock::new(|| {
@@ -60,33 +58,22 @@ impl OperatorRegistry {
     }
 
     /// Load an [`Operator`] via the factory registered for the URI's scheme.
-    pub fn load(
-        &self,
-        uri: &str,
-        options: impl IntoIterator<Item = (String, String)>,
-    ) -> Result<Operator> {
-        let parsed = uri.parse::<Uri>().map_err(|err| {
-            Error::new(ErrorKind::ConfigInvalid, "failed to parse 
uri").set_source(err)
-        })?;
-
-        let scheme = parsed
-            .scheme_str()
-            .ok_or_else(|| Error::new(ErrorKind::ConfigInvalid, "uri scheme is 
required"))?;
+    pub fn load(&self, uri: impl IntoOperatorUri) -> Result<Operator> {
+        let parsed = uri.into_operator_uri()?;
+        let scheme = parsed.scheme();
 
-        let key = scheme.to_ascii_lowercase();
         let factory = self
             .factories
             .lock()
             .expect("operator registry mutex poisoned")
-            .get(key.as_str())
+            .get(scheme)
             .copied()
             .ok_or_else(|| {
                 Error::new(ErrorKind::Unsupported, "scheme is not registered")
-                    .with_context("scheme", scheme)
+                    .with_context("scheme", scheme.to_string())
             })?;
 
-        let opts: Vec<(String, String)> = options.into_iter().collect();
-        factory(uri, opts)
+        factory(&parsed)
     }
 }
 
@@ -97,33 +84,24 @@ fn register_builtin_services(registry: &OperatorRegistry) {
     registry.register::<services::Fs>(services::FS_SCHEME);
     #[cfg(feature = "services-s3")]
     registry.register::<services::S3>(services::S3_SCHEME);
+    #[cfg(feature = "services-azblob")]
+    registry.register::<services::Azblob>(services::AZBLOB_SCHEME);
+    #[cfg(feature = "services-b2")]
+    registry.register::<services::B2>(services::B2_SCHEME);
+    #[cfg(feature = "services-cos")]
+    registry.register::<services::Cos>(services::COS_SCHEME);
+    #[cfg(feature = "services-gcs")]
+    registry.register::<services::Gcs>(services::GCS_SCHEME);
+    #[cfg(feature = "services-obs")]
+    registry.register::<services::Obs>(services::OBS_SCHEME);
+    #[cfg(feature = "services-oss")]
+    registry.register::<services::Oss>(services::OSS_SCHEME);
+    #[cfg(feature = "services-upyun")]
+    registry.register::<services::Upyun>(services::UPYUN_SCHEME);
 }
 
 /// Factory adapter that builds an operator from a configurator type.
-fn factory<C: Configurator>(uri: &str, options: Vec<(String, String)>) -> 
Result<Operator> {
-    let parsed = uri.parse::<Uri>().map_err(|err| {
-        Error::new(ErrorKind::ConfigInvalid, "failed to parse 
uri").set_source(err)
-    })?;
-
-    let mut params = HashMap::new();
-    if let Some(query) = parsed.query() {
-        for pair in query.split('&') {
-            if pair.is_empty() {
-                continue;
-            }
-            let mut parts = pair.splitn(2, '=');
-            let key = parts.next().unwrap_or("");
-            let value = parts.next().unwrap_or("");
-            let key = percent_decode_str(key).decode_utf8_lossy().to_string();
-            let value = 
percent_decode_str(value).decode_utf8_lossy().to_string();
-            params.insert(key.to_ascii_lowercase(), value);
-        }
-    }
-
-    for (key, value) in options {
-        params.insert(key.to_ascii_lowercase(), value);
-    }
-
-    let cfg = C::from_uri(&parsed, &params)?;
+fn factory<C: Configurator>(uri: &OperatorUri) -> Result<Operator> {
+    let cfg = C::from_uri(uri)?;
     Ok(Operator::from_config(cfg)?.finish())
 }
diff --git a/core/src/types/operator/uri.rs b/core/src/types/operator/uri.rs
new file mode 100644
index 000000000..7cfc7900c
--- /dev/null
+++ b/core/src/types/operator/uri.rs
@@ -0,0 +1,264 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use http::Uri;
+use percent_encoding::percent_decode_str;
+
+use crate::{Error, ErrorKind, Result};
+
+/// Parsed representation of an operator URI with normalized components.
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct OperatorUri {
+    scheme: String,
+    name: Option<String>,
+    root: Option<String>,
+    options: HashMap<String, String>,
+}
+
+impl OperatorUri {
+    /// Build [`OperatorUri`] from a [`Uri`] plus additional options.
+    pub fn new(
+        uri: Uri,
+        extra_options: impl IntoIterator<Item = (String, String)>,
+    ) -> Result<Self> {
+        let scheme = uri
+            .scheme_str()
+            .ok_or_else(|| Error::new(ErrorKind::ConfigInvalid, "uri scheme is 
required"))?
+            .to_ascii_lowercase();
+
+        let mut options = HashMap::<String, String>::new();
+
+        if let Some(query) = uri.query() {
+            for pair in query.split('&') {
+                if pair.is_empty() {
+                    continue;
+                }
+                let mut parts = pair.splitn(2, '=');
+                let key = parts.next().unwrap_or("");
+                let value = parts.next().unwrap_or("");
+                let key = percent_decode_str(key)
+                    .decode_utf8_lossy()
+                    .to_ascii_lowercase();
+                let value = 
percent_decode_str(value).decode_utf8_lossy().to_string();
+                options.insert(key, value);
+            }
+        }
+
+        for (key, value) in extra_options {
+            options.insert(key.to_ascii_lowercase(), value);
+        }
+
+        let name = uri.authority().and_then(|authority| {
+            let host = authority.host();
+            if host.is_empty() {
+                None
+            } else {
+                Some(host.to_string())
+            }
+        });
+
+        let decoded_path = percent_decode_str(uri.path()).decode_utf8_lossy();
+        let trimmed = decoded_path.trim_matches('/');
+        let root = if trimmed.is_empty() {
+            None
+        } else {
+            Some(trimmed.to_string())
+        };
+
+        Ok(Self {
+            scheme,
+            name,
+            root,
+            options,
+        })
+    }
+
+    /// Normalized scheme in lowercase.
+    pub fn scheme(&self) -> &str {
+        self.scheme.as_str()
+    }
+
+    /// Name extracted from the URI authority, if present.
+    pub fn name(&self) -> Option<&str> {
+        self.name.as_deref()
+    }
+
+    /// Root path (without leading slash) extracted from the URI path, if 
present.
+    pub fn root(&self) -> Option<&str> {
+        self.root.as_deref()
+    }
+
+    /// Normalized option map merged from query string and extra options 
(excluding reserved keys).
+    pub fn options(&self) -> &HashMap<String, String> {
+        &self.options
+    }
+}
+
+/// Conversion trait that builds [`OperatorUri`] from various inputs.
+pub trait IntoOperatorUri {
+    /// Convert the input into an [`OperatorUri`].
+    fn into_operator_uri(self) -> Result<OperatorUri>;
+}
+
+impl IntoOperatorUri for OperatorUri {
+    fn into_operator_uri(self) -> Result<OperatorUri> {
+        Ok(self)
+    }
+}
+
+impl IntoOperatorUri for &OperatorUri {
+    fn into_operator_uri(self) -> Result<OperatorUri> {
+        Ok(self.clone())
+    }
+}
+
+impl IntoOperatorUri for Uri {
+    fn into_operator_uri(self) -> Result<OperatorUri> {
+        OperatorUri::new(self, Vec::<(String, String)>::new())
+    }
+}
+
+impl IntoOperatorUri for &Uri {
+    fn into_operator_uri(self) -> Result<OperatorUri> {
+        OperatorUri::new(self.clone(), Vec::<(String, String)>::new())
+    }
+}
+
+impl IntoOperatorUri for &str {
+    fn into_operator_uri(self) -> Result<OperatorUri> {
+        let uri = self.parse::<Uri>().map_err(|err| {
+            Error::new(ErrorKind::ConfigInvalid, "failed to parse 
uri").set_source(err)
+        })?;
+        OperatorUri::new(uri, Vec::<(String, String)>::new())
+    }
+}
+
+impl IntoOperatorUri for String {
+    fn into_operator_uri(self) -> Result<OperatorUri> {
+        let uri = self.parse::<Uri>().map_err(|err| {
+            Error::new(ErrorKind::ConfigInvalid, "failed to parse 
uri").set_source(err)
+        })?;
+        OperatorUri::new(uri, Vec::<(String, String)>::new())
+    }
+}
+
+impl<O, K, V> IntoOperatorUri for (Uri, O)
+where
+    O: IntoIterator<Item = (K, V)>,
+    K: Into<String>,
+    V: Into<String>,
+{
+    fn into_operator_uri(self) -> Result<OperatorUri> {
+        let (uri, extra) = self;
+        let opts = extra
+            .into_iter()
+            .map(|(k, v)| (k.into(), v.into()))
+            .collect::<Vec<_>>();
+        OperatorUri::new(uri, opts)
+    }
+}
+
+impl<O, K, V> IntoOperatorUri for (&Uri, O)
+where
+    O: IntoIterator<Item = (K, V)>,
+    K: Into<String>,
+    V: Into<String>,
+{
+    fn into_operator_uri(self) -> Result<OperatorUri> {
+        let (uri, extra) = self;
+        let opts = extra
+            .into_iter()
+            .map(|(k, v)| (k.into(), v.into()))
+            .collect::<Vec<_>>();
+        OperatorUri::new(uri.clone(), opts)
+    }
+}
+
+impl<O, K, V> IntoOperatorUri for (&str, O)
+where
+    O: IntoIterator<Item = (K, V)>,
+    K: Into<String>,
+    V: Into<String>,
+{
+    fn into_operator_uri(self) -> Result<OperatorUri> {
+        let (base, extra) = self;
+        let uri = base.parse::<Uri>().map_err(|err| {
+            Error::new(ErrorKind::ConfigInvalid, "failed to parse 
uri").set_source(err)
+        })?;
+        let opts = extra
+            .into_iter()
+            .map(|(k, v)| (k.into(), v.into()))
+            .collect::<Vec<_>>();
+        OperatorUri::new(uri, opts)
+    }
+}
+
+impl<O, K, V> IntoOperatorUri for (String, O)
+where
+    O: IntoIterator<Item = (K, V)>,
+    K: Into<String>,
+    V: Into<String>,
+{
+    fn into_operator_uri(self) -> Result<OperatorUri> {
+        let (base, extra) = self;
+        (&base[..], extra).into_operator_uri()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::types::IntoOperatorUri;
+
+    #[test]
+    fn parse_uri_with_name_and_root() {
+        let uri = OperatorUri::new(
+            "s3://example-bucket/photos/2024".parse().unwrap(),
+            Vec::<(String, String)>::new(),
+        )
+        .unwrap();
+
+        assert_eq!(uri.scheme(), "s3");
+        assert_eq!(uri.name(), Some("example-bucket"));
+        assert_eq!(uri.root(), Some("photos/2024"));
+        assert!(uri.options().is_empty());
+    }
+
+    #[test]
+    fn into_operator_uri_merges_extra_options() {
+        let uri = (
+            "s3://bucket/path?region=us-east-1",
+            vec![("region", "override"), ("endpoint", "https://custom";)],
+        )
+            .into_operator_uri()
+            .unwrap();
+
+        assert_eq!(uri.scheme(), "s3");
+        assert_eq!(uri.name(), Some("bucket"));
+        assert_eq!(uri.root(), Some("path"));
+        assert_eq!(
+            uri.options().get("region").map(String::as_str),
+            Some("override")
+        );
+        assert_eq!(
+            uri.options().get("endpoint").map(String::as_str),
+            Some("https://custom";)
+        );
+    }
+}

Reply via email to