This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new bb145e66d feat: Add from_uri support for more object storage services
(#6665)
bb145e66d is described below
commit bb145e66d2040ec201d1aaf0d4a1e2a30720c7d9
Author: Xuanwo <[email protected]>
AuthorDate: Tue Oct 14 17:39:27 2025 +0900
feat: Add from_uri support for more object storage services (#6665)
* feat: Add from_uri support for all object storage services
Signed-off-by: Xuanwo <[email protected]>
* Introduce operator uri
Signed-off-by: Xuanwo <[email protected]>
* Fix tests
Signed-off-by: Xuanwo <[email protected]>
* Fix fs
Signed-off-by: Xuanwo <[email protected]>
* Make clippy happy
Signed-off-by: Xuanwo <[email protected]>
* Format code
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
core/src/blocking/operator.rs | 8 +-
core/src/docs/rfcs/5444_operator_from_uri.md | 32 ++--
core/src/services/azblob/backend.rs | 50 ++++-
core/src/services/azblob/mod.rs | 2 +-
core/src/services/b2/backend.rs | 40 +++-
core/src/services/b2/mod.rs | 2 +-
core/src/services/cos/backend.rs | 38 +++-
core/src/services/cos/mod.rs | 2 +-
core/src/services/fs/backend.rs | 52 +++---
core/src/services/gcs/backend.rs | 38 +++-
core/src/services/gcs/mod.rs | 2 +-
core/src/services/memory/backend.rs | 27 ++-
core/src/services/obs/backend.rs | 38 +++-
core/src/services/obs/mod.rs | 2 +-
core/src/services/oss/backend.rs | 38 +++-
core/src/services/oss/mod.rs | 2 +-
core/src/services/s3/backend.rs | 41 +++--
core/src/services/upyun/backend.rs | 38 +++-
core/src/services/upyun/mod.rs | 2 +-
core/src/types/builder.rs | 7 +-
core/src/types/mod.rs | 2 +
core/src/types/operator/builder.rs | 10 +-
core/src/types/operator/mod.rs | 3 +
core/src/types/operator/registry.rs | 70 +++----
core/src/types/operator/uri.rs | 264 +++++++++++++++++++++++++++
25 files changed, 660 insertions(+), 150 deletions(-)
diff --git a/core/src/blocking/operator.rs b/core/src/blocking/operator.rs
index 0ae55953f..037b65a19 100644
--- a/core/src/blocking/operator.rs
+++ b/core/src/blocking/operator.rs
@@ -18,6 +18,7 @@
use tokio::runtime::Handle;
use crate::Operator as AsyncOperator;
+use crate::types::IntoOperatorUri;
use crate::*;
/// Use OpenDAL in blocking context.
@@ -136,11 +137,8 @@ impl Operator {
}
/// Create a blocking operator from URI based configuration.
- pub fn from_uri(
- uri: &str,
- options: impl IntoIterator<Item = (String, String)>,
- ) -> Result<Self> {
- let op = AsyncOperator::from_uri(uri, options)?;
+ pub fn from_uri(uri: impl IntoOperatorUri) -> Result<Self> {
+ let op = AsyncOperator::from_uri(uri)?;
Self::new(op)
}
diff --git a/core/src/docs/rfcs/5444_operator_from_uri.md
b/core/src/docs/rfcs/5444_operator_from_uri.md
index 5b5f21c2d..5bb203d5b 100644
--- a/core/src/docs/rfcs/5444_operator_from_uri.md
+++ b/core/src/docs/rfcs/5444_operator_from_uri.md
@@ -24,18 +24,17 @@ The new API allows creating operators directly from URIs:
```rust
// Create an operator using URI
-let op = Operator::from_uri("s3://my-bucket/path", vec![
- ("endpoint".to_string(), "http://localhost:8080"to_string()),
-])?;
+let op = Operator::from_uri("s3://my-bucket/path")?;
// Users can pass options through the URI along with additional key-value pairs
// The extra options will override identical options specified in the URI
-let op = Operator::from_uri("s3://my-bucket/path?region=us-east-1", vec![
- ("endpoint".to_string(), "http://localhost:8080"to_string()),
-])?;
+let op = Operator::from_uri((
+ "s3://my-bucket/path?region=us-east-1",
+ [("endpoint", "http://localhost:8080")],
+))?;
// Create a file system operator
-let op = Operator::from_uri("fs:///tmp/test", vec![])?;
+let op = Operator::from_uri("fs:///tmp/test")?;
```
OpenDAL will, by default, register services enabled by features in a global
`OperatorRegistry`. Users can also create custom operator registries to support
their own schemes or additional options.
@@ -53,7 +52,7 @@ registry.register::<services::S3>("r2"); // Cloudflare R2
is S3-compatible
registry.register::<services::S3>("company-storage");
registry.register::<services::Azblob>("backup-storage");
-let op = registry.load("company-storage://bucket/path", [])?;
+let op = registry.load("company-storage://bucket/path")?;
```
# Reference-level explanation
@@ -62,12 +61,10 @@ The implementation consists of three main components:
1. The `OperatorFactory` and `OperatorRegistry`:
-`OperatorFactory` is a function type that takes a URI string plus options and
returns an `Operator`. `OperatorRegistry` manages factories registered under
different schemes.
-
-`OperatorFactory` is a function type that takes a URI string plus options and
returns an `Operator`. `OperatorRegistry` manages factories registered under
different schemes.
+`OperatorFactory` is a function type that takes a parsed `OperatorUri` and
returns an `Operator`. `OperatorRegistry` manages factories registered under
different schemes.
```rust
-type OperatorFactory = fn(&str, Vec<(String, String)>) -> Result<Operator>;
+type OperatorFactory = fn(&OperatorUri) -> Result<Operator>;
pub struct OperatorRegistry { ... }
@@ -76,7 +73,7 @@ impl OperatorRegistry {
...
}
- fn load(&self, uri: &str, options: impl IntoIterator<Item = (String,
String)>) -> Result<Operator> {
+ fn load(&self, uri: impl IntoOperatorUri) -> Result<Operator> {
...
}
}
@@ -84,11 +81,11 @@ impl OperatorRegistry {
2. The `Configurator` trait extension:
-`Configurator` will add a new API to create a configuration from a URI and
options. Services should only parse the URI components relevant to their
configuration (host, path, query parameters) without concerning themselves with
the scheme portion.
+`Configurator` will add a new API to create a configuration from a parsed
`OperatorUri`. Services should only inspect the URI components relevant to
their configuration (name, root, options) without concerning themselves with
the scheme portion.
```rust
impl Configurator for S3Config {
- fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
+ fn from_uri(uri: &OperatorUri) -> Result<Self> {
...
}
}
@@ -102,10 +99,7 @@ The `Operator` trait will add a new `from_uri` method to
create an operator from
```rust
impl Operator {
- pub fn from_uri(
- uri: &str,
- options: impl IntoIterator<Item = (String, String)>,
- ) -> Result<Self> {
+ pub fn from_uri(uri: impl IntoOperatorUri) -> Result<Self> {
...
}
}
diff --git a/core/src/services/azblob/backend.rs
b/core/src/services/azblob/backend.rs
index db34c83d8..df81c11a7 100644
--- a/core/src/services/azblob/backend.rs
+++ b/core/src/services/azblob/backend.rs
@@ -30,7 +30,7 @@ use reqsign::AzureStorageSigner;
use sha2::Digest;
use sha2::Sha256;
-use super::DEFAULT_SCHEME;
+use super::AZBLOB_SCHEME;
use super::core::AzblobCore;
use super::core::constants::X_MS_META_PREFIX;
use super::core::constants::X_MS_VERSION_ID;
@@ -41,6 +41,7 @@ use super::writer::AzblobWriter;
use super::writer::AzblobWriters;
use crate::raw::*;
use crate::services::AzblobConfig;
+use crate::types::OperatorUri;
use crate::*;
const AZBLOB_BATCH_LIMIT: usize = 256;
@@ -59,6 +60,20 @@ impl From<AzureStorageConfig> for AzblobConfig {
impl Configurator for AzblobConfig {
type Builder = AzblobBuilder;
+ fn from_uri(uri: &OperatorUri) -> Result<Self> {
+ let mut map = uri.options().clone();
+
+ if let Some(container) = uri.name() {
+ map.insert("container".to_string(), container.to_string());
+ }
+
+ if let Some(root) = uri.root() {
+ map.insert("root".to_string(), root.to_string());
+ }
+
+ Self::from_iter(map)
+ }
+
#[allow(deprecated)]
fn into_builder(self) -> Self::Builder {
AzblobBuilder {
@@ -397,7 +412,7 @@ impl Builder for AzblobBuilder {
core: Arc::new(AzblobCore {
info: {
let am = AccessorInfo::default();
- am.set_scheme(DEFAULT_SCHEME)
+ am.set_scheme(AZBLOB_SCHEME)
.set_root(&root)
.set_name(container)
.set_native_capability(Capability {
@@ -592,3 +607,34 @@ impl Access for AzblobBackend {
)))
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::Configurator;
+ use crate::types::OperatorUri;
+
+ #[test]
+ fn from_uri_with_host_container() {
+ let uri = OperatorUri::new(
+ "azblob://my-container/path/to/root".parse().unwrap(),
+ Vec::<(String, String)>::new(),
+ )
+ .unwrap();
+ let cfg = AzblobConfig::from_uri(&uri).unwrap();
+ assert_eq!(cfg.container, "my-container");
+ assert_eq!(cfg.root.as_deref(), Some("path/to/root"));
+ }
+
+ #[test]
+ fn from_uri_with_path_container() {
+ let uri = OperatorUri::new(
+ "azblob://my-container/nested/root".parse().unwrap(),
+ Vec::<(String, String)>::new(),
+ )
+ .unwrap();
+ let cfg = AzblobConfig::from_uri(&uri).unwrap();
+ assert_eq!(cfg.container, "my-container");
+ assert_eq!(cfg.root.as_deref(), Some("nested/root"));
+ }
+}
diff --git a/core/src/services/azblob/mod.rs b/core/src/services/azblob/mod.rs
index 818ed4628..f014a29ef 100644
--- a/core/src/services/azblob/mod.rs
+++ b/core/src/services/azblob/mod.rs
@@ -17,7 +17,7 @@
/// Default scheme for azblob service.
#[cfg(feature = "services-azblob")]
-pub(super) const DEFAULT_SCHEME: &str = "azblob";
+pub const AZBLOB_SCHEME: &str = "azblob";
#[cfg(feature = "services-azblob")]
pub(crate) mod core;
#[cfg(feature = "services-azblob")]
diff --git a/core/src/services/b2/backend.rs b/core/src/services/b2/backend.rs
index 39148a76f..d545fce5a 100644
--- a/core/src/services/b2/backend.rs
+++ b/core/src/services/b2/backend.rs
@@ -25,7 +25,7 @@ use http::StatusCode;
use log::debug;
use tokio::sync::RwLock;
-use super::DEFAULT_SCHEME;
+use super::B2_SCHEME;
use super::core::B2Core;
use super::core::B2Signer;
use super::core::constants;
@@ -37,10 +37,25 @@ use super::writer::B2Writer;
use super::writer::B2Writers;
use crate::raw::*;
use crate::services::B2Config;
+use crate::types::OperatorUri;
use crate::*;
impl Configurator for B2Config {
type Builder = B2Builder;
+ fn from_uri(uri: &OperatorUri) -> Result<Self> {
+ let mut map = uri.options().clone();
+
+ if let Some(name) = uri.name() {
+ map.insert("bucket".to_string(), name.to_string());
+ }
+
+ if let Some(root) = uri.root() {
+ map.insert("root".to_string(), root.to_string());
+ }
+
+ Self::from_iter(map)
+ }
+
#[allow(deprecated)]
fn into_builder(self) -> Self::Builder {
B2Builder {
@@ -191,7 +206,7 @@ impl Builder for B2Builder {
core: Arc::new(B2Core {
info: {
let am = AccessorInfo::default();
- am.set_scheme(DEFAULT_SCHEME)
+ am.set_scheme(B2_SCHEME)
.set_root(&root)
.set_native_capability(Capability {
stat: true,
@@ -432,3 +447,24 @@ impl Access for B2Backend {
}
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::Configurator;
+ use crate::types::OperatorUri;
+
+ #[test]
+ fn from_uri_extracts_bucket_and_root() {
+ let uri = OperatorUri::new(
+ "b2://example-bucket/path/to/root".parse().unwrap(),
+ vec![("bucket_id".to_string(), "bucket-id".to_string())],
+ )
+ .unwrap();
+
+ let cfg = B2Config::from_uri(&uri).unwrap();
+ assert_eq!(cfg.bucket, "example-bucket");
+ assert_eq!(cfg.root.as_deref(), Some("path/to/root"));
+ assert_eq!(cfg.bucket_id, "bucket-id");
+ }
+}
diff --git a/core/src/services/b2/mod.rs b/core/src/services/b2/mod.rs
index 09c6b4bfe..6a4778994 100644
--- a/core/src/services/b2/mod.rs
+++ b/core/src/services/b2/mod.rs
@@ -17,7 +17,7 @@
/// Default scheme for b2 service.
#[cfg(feature = "services-b2")]
-pub(super) const DEFAULT_SCHEME: &str = "b2";
+pub const B2_SCHEME: &str = "b2";
#[cfg(feature = "services-b2")]
mod core;
#[cfg(feature = "services-b2")]
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index 93969e28c..780e1aa78 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -26,7 +26,7 @@ use reqsign::TencentCosConfig;
use reqsign::TencentCosCredentialLoader;
use reqsign::TencentCosSigner;
-use super::DEFAULT_SCHEME;
+use super::COS_SCHEME;
use super::core::*;
use super::delete::CosDeleter;
use super::error::parse_error;
@@ -38,10 +38,25 @@ use super::writer::CosWriters;
use crate::raw::oio::PageLister;
use crate::raw::*;
use crate::services::CosConfig;
+use crate::types::OperatorUri;
use crate::*;
impl Configurator for CosConfig {
type Builder = CosBuilder;
+ fn from_uri(uri: &OperatorUri) -> Result<Self> {
+ let mut map = uri.options().clone();
+
+ if let Some(name) = uri.name() {
+ map.insert("bucket".to_string(), name.to_string());
+ }
+
+ if let Some(root) = uri.root() {
+ map.insert("root".to_string(), root.to_string());
+ }
+
+ Self::from_iter(map)
+ }
+
#[allow(deprecated)]
fn into_builder(self) -> Self::Builder {
CosBuilder {
@@ -221,7 +236,7 @@ impl Builder for CosBuilder {
core: Arc::new(CosCore {
info: {
let am = AccessorInfo::default();
- am.set_scheme(DEFAULT_SCHEME)
+ am.set_scheme(COS_SCHEME)
.set_root(&root)
.set_name(&bucket)
.set_native_capability(Capability {
@@ -440,3 +455,22 @@ impl Access for CosBackend {
)))
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::Configurator;
+ use crate::types::OperatorUri;
+
+ #[test]
+ fn from_uri_extracts_bucket_and_root() {
+ let uri = OperatorUri::new(
+ "cos://example-bucket/path/to/root".parse().unwrap(),
+ Vec::<(String, String)>::new(),
+ )
+ .unwrap();
+ let cfg = CosConfig::from_uri(&uri).unwrap();
+ assert_eq!(cfg.bucket.as_deref(), Some("example-bucket"));
+ assert_eq!(cfg.root.as_deref(), Some("path/to/root"));
+ }
+}
diff --git a/core/src/services/cos/mod.rs b/core/src/services/cos/mod.rs
index 486597aa2..0de2ff00d 100644
--- a/core/src/services/cos/mod.rs
+++ b/core/src/services/cos/mod.rs
@@ -17,7 +17,7 @@
/// Default scheme for cos service.
#[cfg(feature = "services-cos")]
-pub(super) const DEFAULT_SCHEME: &str = "cos";
+pub const COS_SCHEME: &str = "cos";
#[cfg(feature = "services-cos")]
mod core;
#[cfg(feature = "services-cos")]
diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs
index a42b551c0..9e67c530f 100644
--- a/core/src/services/fs/backend.rs
+++ b/core/src/services/fs/backend.rs
@@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
@@ -30,30 +29,22 @@ use super::writer::FsWriter;
use super::writer::FsWriters;
use crate::raw::*;
use crate::services::FsConfig;
+use crate::types::OperatorUri;
use crate::*;
-use http::Uri;
-use percent_encoding::percent_decode_str;
impl Configurator for FsConfig {
type Builder = FsBuilder;
- fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
- let mut map = options.clone();
-
- if !map.contains_key("root") {
- let path = percent_decode_str(uri.path()).decode_utf8_lossy();
- if path.is_empty() || path == "/" {
- return Err(Error::new(
- ErrorKind::ConfigInvalid,
- "fs uri requires absolute path",
- ));
- }
- if !path.starts_with('/') {
- return Err(Error::new(
- ErrorKind::ConfigInvalid,
- "fs uri root must be absolute",
- ));
- }
- map.insert("root".to_string(), path.to_string());
+ fn from_uri(uri: &OperatorUri) -> Result<Self> {
+ let mut map = uri.options().clone();
+
+ if let Some(value) = match (uri.name(), uri.root()) {
+ (Some(name), Some(rest)) if !rest.is_empty() =>
Some(format!("/{}/{}", name, rest)),
+ (Some(name), _) => Some(format!("/{}", name)),
+ (None, Some(rest)) if !rest.is_empty() => Some(format!("/{}",
rest)),
+ (None, Some(rest)) => Some(rest.to_string()),
+ _ => None,
+ } {
+ map.insert("root".to_string(), value);
}
Self::from_iter(map)
@@ -298,3 +289,22 @@ impl Access for FsBackend {
Ok(RpRename::default())
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::Configurator;
+ use crate::types::OperatorUri;
+ use http::Uri;
+
+ #[test]
+ fn from_uri_extracts_root() {
+ let uri = OperatorUri::new(
+ Uri::from_static("fs://tmp/data"),
+ Vec::<(String, String)>::new(),
+ )
+ .unwrap();
+ let cfg = FsConfig::from_uri(&uri).unwrap();
+ assert_eq!(cfg.root.as_deref(), Some("/tmp/data"));
+ }
+}
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index c99150ab3..6ba521f16 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -27,7 +27,7 @@ use reqsign::GoogleSigner;
use reqsign::GoogleTokenLoad;
use reqsign::GoogleTokenLoader;
-use super::DEFAULT_SCHEME;
+use super::GCS_SCHEME;
use super::core::*;
use super::delete::GcsDeleter;
use super::error::parse_error;
@@ -37,6 +37,7 @@ use super::writer::GcsWriters;
use crate::raw::oio::BatchDeleter;
use crate::raw::*;
use crate::services::GcsConfig;
+use crate::types::OperatorUri;
use crate::*;
const DEFAULT_GCS_ENDPOINT: &str = "https://storage.googleapis.com";
const DEFAULT_GCS_SCOPE: &str =
"https://www.googleapis.com/auth/devstorage.read_write";
@@ -44,6 +45,20 @@ const DEFAULT_GCS_SCOPE: &str =
"https://www.googleapis.com/auth/devstorage.read
impl Configurator for GcsConfig {
type Builder = GcsBuilder;
+ fn from_uri(uri: &OperatorUri) -> Result<Self> {
+ let mut map = uri.options().clone();
+
+ if let Some(name) = uri.name() {
+ map.insert("bucket".to_string(), name.to_string());
+ }
+
+ if let Some(root) = uri.root() {
+ map.insert("root".to_string(), root.to_string());
+ }
+
+ Self::from_iter(map)
+ }
+
#[allow(deprecated)]
fn into_builder(self) -> Self::Builder {
GcsBuilder {
@@ -307,7 +322,7 @@ impl Builder for GcsBuilder {
core: Arc::new(GcsCore {
info: {
let am = AccessorInfo::default();
- am.set_scheme(DEFAULT_SCHEME)
+ am.set_scheme(GCS_SCHEME)
.set_root(&root)
.set_name(bucket)
.set_native_capability(Capability {
@@ -497,3 +512,22 @@ impl Access for GcsBackend {
)))
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::Configurator;
+ use crate::types::OperatorUri;
+
+ #[test]
+ fn from_uri_extracts_bucket_and_root() {
+ let uri = OperatorUri::new(
+ "gcs://example-bucket/path/to/root".parse().unwrap(),
+ Vec::<(String, String)>::new(),
+ )
+ .unwrap();
+ let cfg = GcsConfig::from_uri(&uri).unwrap();
+ assert_eq!(cfg.bucket, "example-bucket");
+ assert_eq!(cfg.root.as_deref(), Some("path/to/root"));
+ }
+}
diff --git a/core/src/services/gcs/mod.rs b/core/src/services/gcs/mod.rs
index a4d5d9712..0c8427158 100644
--- a/core/src/services/gcs/mod.rs
+++ b/core/src/services/gcs/mod.rs
@@ -17,7 +17,7 @@
/// Default scheme for gcs service.
#[cfg(feature = "services-gcs")]
-pub(super) const DEFAULT_SCHEME: &str = "gcs";
+pub const GCS_SCHEME: &str = "gcs";
#[cfg(feature = "services-gcs")]
mod core;
#[cfg(feature = "services-gcs")]
diff --git a/core/src/services/memory/backend.rs
b/core/src/services/memory/backend.rs
index 9def58cce..1e65f5419 100644
--- a/core/src/services/memory/backend.rs
+++ b/core/src/services/memory/backend.rs
@@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
@@ -27,19 +26,16 @@ use super::writer::MemoryWriter;
use crate::raw::oio;
use crate::raw::*;
use crate::services::MemoryConfig;
+use crate::types::OperatorUri;
use crate::*;
-use http::Uri;
-use percent_encoding::percent_decode_str;
impl Configurator for MemoryConfig {
type Builder = MemoryBuilder;
- fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
- let mut map = options.clone();
-
+ fn from_uri(uri: &OperatorUri) -> Result<Self> {
+ let mut map = uri.options().clone();
if !map.contains_key("root") {
- let path = percent_decode_str(uri.path()).decode_utf8_lossy();
- if !path.is_empty() && path != "/" {
- map.insert("root".to_string(),
path.trim_start_matches('/').to_string());
+ if let Some(root) = uri.root().filter(|v| !v.is_empty()) {
+ map.insert("root".to_string(), root.to_string());
}
}
@@ -194,6 +190,8 @@ impl Access for MemoryAccessor {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::Configurator;
+ use crate::types::OperatorUri;
#[test]
fn test_accessor_metadata_name() {
@@ -203,4 +201,15 @@ mod tests {
let b2 = MemoryBuilder::default().build().unwrap();
assert_ne!(b1.info().name(), b2.info().name())
}
+
+ #[test]
+ fn from_uri_extracts_root() {
+ let uri = OperatorUri::new(
+ "memory://localhost/path/to/root".parse().unwrap(),
+ Vec::<(String, String)>::new(),
+ )
+ .unwrap();
+ let cfg = MemoryConfig::from_uri(&uri).unwrap();
+ assert_eq!(cfg.root.as_deref(), Some("path/to/root"));
+ }
}
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index bac66cc60..71d49ceaa 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -28,7 +28,7 @@ use reqsign::HuaweicloudObsConfig;
use reqsign::HuaweicloudObsCredentialLoader;
use reqsign::HuaweicloudObsSigner;
-use super::DEFAULT_SCHEME;
+use super::OBS_SCHEME;
use super::core::ObsCore;
use super::core::constants;
use super::delete::ObsDeleter;
@@ -38,10 +38,25 @@ use super::writer::ObsWriter;
use super::writer::ObsWriters;
use crate::raw::*;
use crate::services::ObsConfig;
+use crate::types::OperatorUri;
use crate::*;
impl Configurator for ObsConfig {
type Builder = ObsBuilder;
+ fn from_uri(uri: &OperatorUri) -> Result<Self> {
+ let mut map = uri.options().clone();
+
+ if let Some(name) = uri.name() {
+ map.insert("bucket".to_string(), name.to_string());
+ }
+
+ if let Some(root) = uri.root() {
+ map.insert("root".to_string(), root.to_string());
+ }
+
+ Self::from_iter(map)
+ }
+
#[allow(deprecated)]
fn into_builder(self) -> Self::Builder {
ObsBuilder {
@@ -226,7 +241,7 @@ impl Builder for ObsBuilder {
core: Arc::new(ObsCore {
info: {
let am = AccessorInfo::default();
- am.set_scheme(DEFAULT_SCHEME)
+ am.set_scheme(OBS_SCHEME)
.set_root(&root)
.set_name(&bucket)
.set_native_capability(Capability {
@@ -434,3 +449,22 @@ impl Access for ObsBackend {
)))
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::Configurator;
+ use crate::types::OperatorUri;
+
+ #[test]
+ fn from_uri_extracts_bucket_and_root() {
+ let uri = OperatorUri::new(
+ "obs://example-bucket/path/to/root".parse().unwrap(),
+ Vec::<(String, String)>::new(),
+ )
+ .unwrap();
+ let cfg = ObsConfig::from_uri(&uri).unwrap();
+ assert_eq!(cfg.bucket.as_deref(), Some("example-bucket"));
+ assert_eq!(cfg.root.as_deref(), Some("path/to/root"));
+ }
+}
diff --git a/core/src/services/obs/mod.rs b/core/src/services/obs/mod.rs
index ed94697a1..c33719f6a 100644
--- a/core/src/services/obs/mod.rs
+++ b/core/src/services/obs/mod.rs
@@ -17,7 +17,7 @@
/// Default scheme for obs service.
#[cfg(feature = "services-obs")]
-pub(super) const DEFAULT_SCHEME: &str = "obs";
+pub const OBS_SCHEME: &str = "obs";
#[cfg(feature = "services-obs")]
mod core;
#[cfg(feature = "services-obs")]
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 4d0d8cb62..0b2856c43 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -27,7 +27,7 @@ use reqsign::AliyunConfig;
use reqsign::AliyunLoader;
use reqsign::AliyunOssSigner;
-use super::DEFAULT_SCHEME;
+use super::OSS_SCHEME;
use super::core::*;
use super::delete::OssDeleter;
use super::error::parse_error;
@@ -38,12 +38,27 @@ use super::writer::OssWriter;
use super::writer::OssWriters;
use crate::raw::*;
use crate::services::OssConfig;
+use crate::types::OperatorUri;
use crate::*;
const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
impl Configurator for OssConfig {
type Builder = OssBuilder;
+ fn from_uri(uri: &OperatorUri) -> Result<Self> {
+ let mut map = uri.options().clone();
+
+ if let Some(name) = uri.name() {
+ map.insert("bucket".to_string(), name.to_string());
+ }
+
+ if let Some(root) = uri.root() {
+ map.insert("root".to_string(), root.to_string());
+ }
+
+ Self::from_iter(map)
+ }
+
#[allow(deprecated)]
fn into_builder(self) -> Self::Builder {
OssBuilder {
@@ -511,7 +526,7 @@ impl Builder for OssBuilder {
core: Arc::new(OssCore {
info: {
let am = AccessorInfo::default();
- am.set_scheme(DEFAULT_SCHEME)
+ am.set_scheme(OSS_SCHEME)
.set_root(&root)
.set_name(bucket)
.set_native_capability(Capability {
@@ -732,3 +747,22 @@ impl Access for OssBackend {
)))
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::Configurator;
+ use crate::types::OperatorUri;
+
+ #[test]
+ fn from_uri_extracts_bucket_and_root() {
+ let uri = OperatorUri::new(
+ "oss://example-bucket/path/to/root".parse().unwrap(),
+ Vec::<(String, String)>::new(),
+ )
+ .unwrap();
+ let cfg = OssConfig::from_uri(&uri).unwrap();
+ assert_eq!(cfg.bucket, "example-bucket");
+ assert_eq!(cfg.root.as_deref(), Some("path/to/root"));
+ }
+}
diff --git a/core/src/services/oss/mod.rs b/core/src/services/oss/mod.rs
index e27e6a78b..e3e05c1b6 100644
--- a/core/src/services/oss/mod.rs
+++ b/core/src/services/oss/mod.rs
@@ -17,7 +17,7 @@
/// Default scheme for oss service.
#[cfg(feature = "services-oss")]
-pub(super) const DEFAULT_SCHEME: &str = "oss";
+pub const OSS_SCHEME: &str = "oss";
#[cfg(feature = "services-oss")]
mod core;
#[cfg(feature = "services-oss")]
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 4570ce6b0..309859dab 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -30,12 +30,10 @@ use constants::X_AMZ_META_PREFIX;
use constants::X_AMZ_VERSION_ID;
use http::Response;
use http::StatusCode;
-use http::Uri;
use log::debug;
use log::warn;
use md5::Digest;
use md5::Md5;
-use percent_encoding::percent_decode_str;
use reqsign::AwsAssumeRoleLoader;
use reqsign::AwsConfig;
use reqsign::AwsCredentialLoad;
@@ -56,6 +54,7 @@ use super::writer::S3Writers;
use crate::raw::oio::PageLister;
use crate::raw::*;
use crate::services::S3Config;
+use crate::types::OperatorUri;
use crate::*;
/// Allow constructing correct region endpoint if user gives a global endpoint.
@@ -74,25 +73,15 @@ const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
impl Configurator for S3Config {
type Builder = S3Builder;
- fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
- let mut map = options.clone();
-
- let bucket_missing = map.get("bucket").map(|v|
v.is_empty()).unwrap_or(true);
- if bucket_missing {
- let bucket = uri
- .authority()
- .map(|authority| authority.host())
- .filter(|host| !host.is_empty())
- .ok_or_else(|| Error::new(ErrorKind::ConfigInvalid, "s3 uri
requires bucket"))?;
- map.insert("bucket".to_string(), bucket.to_string());
+ fn from_uri(uri: &OperatorUri) -> Result<Self> {
+ let mut map = uri.options().clone();
+
+ if let Some(name) = uri.name() {
+ map.insert("bucket".to_string(), name.to_string());
}
- if !map.contains_key("root") {
- let path = percent_decode_str(uri.path()).decode_utf8_lossy();
- let trimmed = path.trim_matches('/');
- if !trimmed.is_empty() {
- map.insert("root".to_string(), trimmed.to_string());
- }
+ if let Some(root) = uri.root() {
+ map.insert("root".to_string(), root.to_string());
}
Self::from_iter(map)
@@ -1188,6 +1177,8 @@ impl Access for S3Backend {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::Configurator;
+ use crate::types::OperatorUri;
#[test]
fn test_is_valid_bucket() {
@@ -1290,4 +1281,16 @@ mod tests {
assert_eq!(region.as_deref(), expected, "{name}");
}
}
+
+ #[test]
+ fn from_uri_extracts_bucket_and_root() {
+ let uri = OperatorUri::new(
+ "s3://example-bucket/path/to/root".parse().unwrap(),
+ Vec::<(String, String)>::new(),
+ )
+ .unwrap();
+ let cfg = S3Config::from_uri(&uri).unwrap();
+ assert_eq!(cfg.bucket, "example-bucket");
+ assert_eq!(cfg.root.as_deref(), Some("path/to/root"));
+ }
}
diff --git a/core/src/services/upyun/backend.rs
b/core/src/services/upyun/backend.rs
index 5f03a7241..412011e49 100644
--- a/core/src/services/upyun/backend.rs
+++ b/core/src/services/upyun/backend.rs
@@ -23,7 +23,7 @@ use http::Response;
use http::StatusCode;
use log::debug;
-use super::DEFAULT_SCHEME;
+use super::UPYUN_SCHEME;
use super::core::*;
use super::delete::UpyunDeleter;
use super::error::parse_error;
@@ -32,10 +32,25 @@ use super::writer::UpyunWriter;
use super::writer::UpyunWriters;
use crate::raw::*;
use crate::services::UpyunConfig;
+use crate::types::OperatorUri;
use crate::*;
impl Configurator for UpyunConfig {
type Builder = UpyunBuilder;
+ fn from_uri(uri: &OperatorUri) -> Result<Self> {
+ let mut map = uri.options().clone();
+
+ if let Some(name) = uri.name() {
+ map.insert("bucket".to_string(), name.to_string());
+ }
+
+ if let Some(root) = uri.root() {
+ map.insert("root".to_string(), root.to_string());
+ }
+
+ Self::from_iter(map)
+ }
+
#[allow(deprecated)]
fn into_builder(self) -> Self::Builder {
UpyunBuilder {
@@ -169,7 +184,7 @@ impl Builder for UpyunBuilder {
core: Arc::new(UpyunCore {
info: {
let am = AccessorInfo::default();
- am.set_scheme(DEFAULT_SCHEME)
+ am.set_scheme(UPYUN_SCHEME)
.set_root(&root)
.set_native_capability(Capability {
stat: true,
@@ -315,3 +330,22 @@ impl Access for UpyunBackend {
}
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::Configurator;
+ use crate::types::OperatorUri;
+
+ #[test]
+ fn from_uri_extracts_bucket_and_root() {
+ let uri = OperatorUri::new(
+ "upyun://example-bucket/path/to/root".parse().unwrap(),
+ Vec::<(String, String)>::new(),
+ )
+ .unwrap();
+ let cfg = UpyunConfig::from_uri(&uri).unwrap();
+ assert_eq!(cfg.bucket, "example-bucket");
+ assert_eq!(cfg.root.as_deref(), Some("path/to/root"));
+ }
+}
diff --git a/core/src/services/upyun/mod.rs b/core/src/services/upyun/mod.rs
index eee374681..50db7493e 100644
--- a/core/src/services/upyun/mod.rs
+++ b/core/src/services/upyun/mod.rs
@@ -17,7 +17,7 @@
/// Default scheme for upyun service.
#[cfg(feature = "services-upyun")]
-pub(super) const DEFAULT_SCHEME: &str = "upyun";
+pub const UPYUN_SCHEME: &str = "upyun";
#[cfg(feature = "services-upyun")]
mod core;
#[cfg(feature = "services-upyun")]
diff --git a/core/src/types/builder.rs b/core/src/types/builder.rs
index c91363ed7..903459775 100644
--- a/core/src/types/builder.rs
+++ b/core/src/types/builder.rs
@@ -15,14 +15,13 @@
// specific language governing permissions and limitations
// under the License.
-use std::collections::HashMap;
use std::fmt::Debug;
-use http::Uri;
use serde::Serialize;
use serde::de::DeserializeOwned;
use crate::raw::*;
+use crate::types::OperatorUri;
use crate::*;
/// Builder is used to set up underlying services.
@@ -125,8 +124,8 @@ pub trait Configurator: Serialize + DeserializeOwned +
Debug + 'static {
/// Associated builder for this configuration.
type Builder: Builder;
- /// Build configuration from a URI plus merged options.
- fn from_uri(_uri: &Uri, _options: &HashMap<String, String>) ->
Result<Self> {
+ /// Build configuration from a parsed URI plus merged options.
+ fn from_uri(_uri: &OperatorUri) -> Result<Self> {
Err(Error::new(ErrorKind::Unsupported, "uri is not supported"))
}
diff --git a/core/src/types/mod.rs b/core/src/types/mod.rs
index b57155646..79f8daf36 100644
--- a/core/src/types/mod.rs
+++ b/core/src/types/mod.rs
@@ -44,11 +44,13 @@ pub use execute::*;
mod operator;
pub use operator::DEFAULT_OPERATOR_REGISTRY;
+pub use operator::IntoOperatorUri;
pub use operator::Operator;
pub use operator::OperatorBuilder;
pub use operator::OperatorFactory;
pub use operator::OperatorInfo;
pub use operator::OperatorRegistry;
+pub use operator::OperatorUri;
pub use operator::operator_futures;
mod builder;
diff --git a/core/src/types/operator/builder.rs
b/core/src/types/operator/builder.rs
index 22a10e274..e2628d828 100644
--- a/core/src/types/operator/builder.rs
+++ b/core/src/types/operator/builder.rs
@@ -20,6 +20,7 @@ use std::sync::Arc;
use crate::layers::*;
use crate::raw::*;
+use crate::types::IntoOperatorUri;
use crate::*;
/// # Operator build API
@@ -141,16 +142,13 @@ impl Operator {
/// use opendal::Operator;
///
/// # fn example() -> Result<()> {
- /// let op = Operator::from_uri("memory://localhost/", [])?;
+ /// let op = Operator::from_uri("memory://localhost/")?;
/// # let _ = op;
/// # Ok(())
/// # }
/// ```
- pub fn from_uri(
- uri: &str,
- options: impl IntoIterator<Item = (String, String)>,
- ) -> Result<Operator> {
- crate::DEFAULT_OPERATOR_REGISTRY.load(uri, options)
+ pub fn from_uri(uri: impl IntoOperatorUri) -> Result<Operator> {
+ crate::DEFAULT_OPERATOR_REGISTRY.load(uri)
}
/// Create a new operator via given scheme and iterator of config value in
dynamic dispatch.
diff --git a/core/src/types/operator/mod.rs b/core/src/types/operator/mod.rs
index c42f6d281..3de1ea7c1 100644
--- a/core/src/types/operator/mod.rs
+++ b/core/src/types/operator/mod.rs
@@ -31,3 +31,6 @@ pub mod operator_futures;
mod registry;
pub use registry::{DEFAULT_OPERATOR_REGISTRY, OperatorFactory,
OperatorRegistry};
+
+mod uri;
+pub use uri::{IntoOperatorUri, OperatorUri};
diff --git a/core/src/types/operator/registry.rs
b/core/src/types/operator/registry.rs
index 7f11c003f..006c6d843 100644
--- a/core/src/types/operator/registry.rs
+++ b/core/src/types/operator/registry.rs
@@ -18,15 +18,13 @@
use std::collections::HashMap;
use std::sync::{LazyLock, Mutex};
-use http::Uri;
-use percent_encoding::percent_decode_str;
-
use crate::services;
use crate::types::builder::{Builder, Configurator};
+use crate::types::{IntoOperatorUri, OperatorUri};
use crate::{Error, ErrorKind, Operator, Result};
/// Factory signature used to construct [`Operator`] from a URI and extra
options.
-pub type OperatorFactory = fn(&str, Vec<(String, String)>) -> Result<Operator>;
+pub type OperatorFactory = fn(&OperatorUri) -> Result<Operator>;
/// Default registry initialized with builtin services.
pub static DEFAULT_OPERATOR_REGISTRY: LazyLock<OperatorRegistry> =
LazyLock::new(|| {
@@ -60,33 +58,22 @@ impl OperatorRegistry {
}
/// Load an [`Operator`] via the factory registered for the URI's scheme.
- pub fn load(
- &self,
- uri: &str,
- options: impl IntoIterator<Item = (String, String)>,
- ) -> Result<Operator> {
- let parsed = uri.parse::<Uri>().map_err(|err| {
- Error::new(ErrorKind::ConfigInvalid, "failed to parse
uri").set_source(err)
- })?;
-
- let scheme = parsed
- .scheme_str()
- .ok_or_else(|| Error::new(ErrorKind::ConfigInvalid, "uri scheme is
required"))?;
+ pub fn load(&self, uri: impl IntoOperatorUri) -> Result<Operator> {
+ let parsed = uri.into_operator_uri()?;
+ let scheme = parsed.scheme();
- let key = scheme.to_ascii_lowercase();
let factory = self
.factories
.lock()
.expect("operator registry mutex poisoned")
- .get(key.as_str())
+ .get(scheme)
.copied()
.ok_or_else(|| {
Error::new(ErrorKind::Unsupported, "scheme is not registered")
- .with_context("scheme", scheme)
+ .with_context("scheme", scheme.to_string())
})?;
- let opts: Vec<(String, String)> = options.into_iter().collect();
- factory(uri, opts)
+ factory(&parsed)
}
}
@@ -97,33 +84,24 @@ fn register_builtin_services(registry: &OperatorRegistry) {
registry.register::<services::Fs>(services::FS_SCHEME);
#[cfg(feature = "services-s3")]
registry.register::<services::S3>(services::S3_SCHEME);
+ #[cfg(feature = "services-azblob")]
+ registry.register::<services::Azblob>(services::AZBLOB_SCHEME);
+ #[cfg(feature = "services-b2")]
+ registry.register::<services::B2>(services::B2_SCHEME);
+ #[cfg(feature = "services-cos")]
+ registry.register::<services::Cos>(services::COS_SCHEME);
+ #[cfg(feature = "services-gcs")]
+ registry.register::<services::Gcs>(services::GCS_SCHEME);
+ #[cfg(feature = "services-obs")]
+ registry.register::<services::Obs>(services::OBS_SCHEME);
+ #[cfg(feature = "services-oss")]
+ registry.register::<services::Oss>(services::OSS_SCHEME);
+ #[cfg(feature = "services-upyun")]
+ registry.register::<services::Upyun>(services::UPYUN_SCHEME);
}
/// Factory adapter that builds an operator from a configurator type.
-fn factory<C: Configurator>(uri: &str, options: Vec<(String, String)>) ->
Result<Operator> {
- let parsed = uri.parse::<Uri>().map_err(|err| {
- Error::new(ErrorKind::ConfigInvalid, "failed to parse
uri").set_source(err)
- })?;
-
- let mut params = HashMap::new();
- if let Some(query) = parsed.query() {
- for pair in query.split('&') {
- if pair.is_empty() {
- continue;
- }
- let mut parts = pair.splitn(2, '=');
- let key = parts.next().unwrap_or("");
- let value = parts.next().unwrap_or("");
- let key = percent_decode_str(key).decode_utf8_lossy().to_string();
- let value =
percent_decode_str(value).decode_utf8_lossy().to_string();
- params.insert(key.to_ascii_lowercase(), value);
- }
- }
-
- for (key, value) in options {
- params.insert(key.to_ascii_lowercase(), value);
- }
-
- let cfg = C::from_uri(&parsed, ¶ms)?;
+fn factory<C: Configurator>(uri: &OperatorUri) -> Result<Operator> {
+ let cfg = C::from_uri(uri)?;
Ok(Operator::from_config(cfg)?.finish())
}
diff --git a/core/src/types/operator/uri.rs b/core/src/types/operator/uri.rs
new file mode 100644
index 000000000..7cfc7900c
--- /dev/null
+++ b/core/src/types/operator/uri.rs
@@ -0,0 +1,264 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use http::Uri;
+use percent_encoding::percent_decode_str;
+
+use crate::{Error, ErrorKind, Result};
+
+/// Parsed representation of an operator URI with normalized components.
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct OperatorUri {
+ scheme: String,
+ name: Option<String>,
+ root: Option<String>,
+ options: HashMap<String, String>,
+}
+
+impl OperatorUri {
+ /// Build [`OperatorUri`] from a [`Uri`] plus additional options.
+ pub fn new(
+ uri: Uri,
+ extra_options: impl IntoIterator<Item = (String, String)>,
+ ) -> Result<Self> {
+ let scheme = uri
+ .scheme_str()
+ .ok_or_else(|| Error::new(ErrorKind::ConfigInvalid, "uri scheme is
required"))?
+ .to_ascii_lowercase();
+
+ let mut options = HashMap::<String, String>::new();
+
+ if let Some(query) = uri.query() {
+ for pair in query.split('&') {
+ if pair.is_empty() {
+ continue;
+ }
+ let mut parts = pair.splitn(2, '=');
+ let key = parts.next().unwrap_or("");
+ let value = parts.next().unwrap_or("");
+ let key = percent_decode_str(key)
+ .decode_utf8_lossy()
+ .to_ascii_lowercase();
+ let value =
percent_decode_str(value).decode_utf8_lossy().to_string();
+ options.insert(key, value);
+ }
+ }
+
+ for (key, value) in extra_options {
+ options.insert(key.to_ascii_lowercase(), value);
+ }
+
+ let name = uri.authority().and_then(|authority| {
+ let host = authority.host();
+ if host.is_empty() {
+ None
+ } else {
+ Some(host.to_string())
+ }
+ });
+
+ let decoded_path = percent_decode_str(uri.path()).decode_utf8_lossy();
+ let trimmed = decoded_path.trim_matches('/');
+ let root = if trimmed.is_empty() {
+ None
+ } else {
+ Some(trimmed.to_string())
+ };
+
+ Ok(Self {
+ scheme,
+ name,
+ root,
+ options,
+ })
+ }
+
+ /// Normalized scheme in lowercase.
+ pub fn scheme(&self) -> &str {
+ self.scheme.as_str()
+ }
+
+ /// Name extracted from the URI authority, if present.
+ pub fn name(&self) -> Option<&str> {
+ self.name.as_deref()
+ }
+
+ /// Root path (without leading slash) extracted from the URI path, if
present.
+ pub fn root(&self) -> Option<&str> {
+ self.root.as_deref()
+ }
+
+ /// Normalized option map merged from query string and extra options
(excluding reserved keys).
+ pub fn options(&self) -> &HashMap<String, String> {
+ &self.options
+ }
+}
+
+/// Conversion trait that builds [`OperatorUri`] from various inputs.
+pub trait IntoOperatorUri {
+ /// Convert the input into an [`OperatorUri`].
+ fn into_operator_uri(self) -> Result<OperatorUri>;
+}
+
+impl IntoOperatorUri for OperatorUri {
+ fn into_operator_uri(self) -> Result<OperatorUri> {
+ Ok(self)
+ }
+}
+
+impl IntoOperatorUri for &OperatorUri {
+ fn into_operator_uri(self) -> Result<OperatorUri> {
+ Ok(self.clone())
+ }
+}
+
+impl IntoOperatorUri for Uri {
+ fn into_operator_uri(self) -> Result<OperatorUri> {
+ OperatorUri::new(self, Vec::<(String, String)>::new())
+ }
+}
+
+impl IntoOperatorUri for &Uri {
+ fn into_operator_uri(self) -> Result<OperatorUri> {
+ OperatorUri::new(self.clone(), Vec::<(String, String)>::new())
+ }
+}
+
+impl IntoOperatorUri for &str {
+ fn into_operator_uri(self) -> Result<OperatorUri> {
+ let uri = self.parse::<Uri>().map_err(|err| {
+ Error::new(ErrorKind::ConfigInvalid, "failed to parse
uri").set_source(err)
+ })?;
+ OperatorUri::new(uri, Vec::<(String, String)>::new())
+ }
+}
+
+impl IntoOperatorUri for String {
+ fn into_operator_uri(self) -> Result<OperatorUri> {
+ let uri = self.parse::<Uri>().map_err(|err| {
+ Error::new(ErrorKind::ConfigInvalid, "failed to parse
uri").set_source(err)
+ })?;
+ OperatorUri::new(uri, Vec::<(String, String)>::new())
+ }
+}
+
+impl<O, K, V> IntoOperatorUri for (Uri, O)
+where
+ O: IntoIterator<Item = (K, V)>,
+ K: Into<String>,
+ V: Into<String>,
+{
+ fn into_operator_uri(self) -> Result<OperatorUri> {
+ let (uri, extra) = self;
+ let opts = extra
+ .into_iter()
+ .map(|(k, v)| (k.into(), v.into()))
+ .collect::<Vec<_>>();
+ OperatorUri::new(uri, opts)
+ }
+}
+
+impl<O, K, V> IntoOperatorUri for (&Uri, O)
+where
+ O: IntoIterator<Item = (K, V)>,
+ K: Into<String>,
+ V: Into<String>,
+{
+ fn into_operator_uri(self) -> Result<OperatorUri> {
+ let (uri, extra) = self;
+ let opts = extra
+ .into_iter()
+ .map(|(k, v)| (k.into(), v.into()))
+ .collect::<Vec<_>>();
+ OperatorUri::new(uri.clone(), opts)
+ }
+}
+
+impl<O, K, V> IntoOperatorUri for (&str, O)
+where
+ O: IntoIterator<Item = (K, V)>,
+ K: Into<String>,
+ V: Into<String>,
+{
+ fn into_operator_uri(self) -> Result<OperatorUri> {
+ let (base, extra) = self;
+ let uri = base.parse::<Uri>().map_err(|err| {
+ Error::new(ErrorKind::ConfigInvalid, "failed to parse
uri").set_source(err)
+ })?;
+ let opts = extra
+ .into_iter()
+ .map(|(k, v)| (k.into(), v.into()))
+ .collect::<Vec<_>>();
+ OperatorUri::new(uri, opts)
+ }
+}
+
+impl<O, K, V> IntoOperatorUri for (String, O)
+where
+ O: IntoIterator<Item = (K, V)>,
+ K: Into<String>,
+ V: Into<String>,
+{
+ fn into_operator_uri(self) -> Result<OperatorUri> {
+ let (base, extra) = self;
+ (&base[..], extra).into_operator_uri()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::types::IntoOperatorUri;
+
+ #[test]
+ fn parse_uri_with_name_and_root() {
+ let uri = OperatorUri::new(
+ "s3://example-bucket/photos/2024".parse().unwrap(),
+ Vec::<(String, String)>::new(),
+ )
+ .unwrap();
+
+ assert_eq!(uri.scheme(), "s3");
+ assert_eq!(uri.name(), Some("example-bucket"));
+ assert_eq!(uri.root(), Some("photos/2024"));
+ assert!(uri.options().is_empty());
+ }
+
+ #[test]
+ fn into_operator_uri_merges_extra_options() {
+ let uri = (
+ "s3://bucket/path?region=us-east-1",
+ vec![("region", "override"), ("endpoint", "https://custom")],
+ )
+ .into_operator_uri()
+ .unwrap();
+
+ assert_eq!(uri.scheme(), "s3");
+ assert_eq!(uri.name(), Some("bucket"));
+ assert_eq!(uri.root(), Some("path"));
+ assert_eq!(
+ uri.options().get("region").map(String::as_str),
+ Some("override")
+ );
+ assert_eq!(
+ uri.options().get("endpoint").map(String::as_str),
+ Some("https://custom")
+ );
+ }
+}