blackmwk commented on code in PR #2231:
URL: https://github.com/apache/iceberg-rust/pull/2231#discussion_r2944182233


##########
crates/storage/opendal/src/resolving.rs:
##########
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Resolving storage that auto-detects the scheme from a path and delegates
+//! to the appropriate [`OpenDalStorage`] variant.
+
+use std::collections::HashMap;
+use std::sync::{Arc, RwLock};
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use iceberg::io::{
+    FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, 
StorageConfig,
+    StorageFactory,
+};
+use iceberg::{Error, ErrorKind, Result};
+use serde::{Deserialize, Serialize};
+use url::Url;
+
+use crate::OpenDalStorage;
+#[cfg(feature = "opendal-s3")]
+use crate::s3::CustomAwsCredentialLoader;
+
+/// Extract the scheme from a path string (e.g., `"s3://bucket/key"` → `"s3"`).
+fn extract_scheme(path: &str) -> Result<String> {
+    let url = Url::parse(path).map_err(|e| {
+        Error::new(
+            ErrorKind::DataInvalid,
+            format!("Invalid path: {path}, failed to parse URL: {e}"),
+        )
+    })?;
+    Ok(url.scheme().to_string())
+}
+
+/// Build an [`OpenDalStorage`] variant for the given scheme and config 
properties.
+fn build_storage_for_scheme(
+    scheme: &str,
+    props: &HashMap<String, String>,
+    #[cfg(feature = "opendal-s3")] customized_credential_load: 
&Option<CustomAwsCredentialLoader>,
+) -> Result<OpenDalStorage> {
+    match scheme {
+        #[cfg(feature = "opendal-s3")]
+        "s3" | "s3a" | "s3n" => {
+            let config = crate::s3::s3_config_parse(props.clone())?;
+            Ok(OpenDalStorage::S3 {
+                configured_scheme: scheme.to_string(),
+                config: Arc::new(config),
+                customized_credential_load: customized_credential_load.clone(),
+            })
+        }
+        #[cfg(feature = "opendal-gcs")]
+        "gs" => {
+            let config = crate::gcs::gcs_config_parse(props.clone())?;
+            Ok(OpenDalStorage::Gcs {
+                config: Arc::new(config),
+            })
+        }
+        #[cfg(feature = "opendal-oss")]
+        "oss" => {
+            let config = crate::oss::oss_config_parse(props.clone())?;
+            Ok(OpenDalStorage::Oss {
+                config: Arc::new(config),
+            })
+        }
+        #[cfg(feature = "opendal-azdls")]
+        "abfs" | "abfss" | "wasb" | "wasbs" => {
+            let configured_scheme: crate::azdls::AzureStorageScheme = 
scheme.parse()?;
+            let config = crate::azdls::azdls_config_parse(props.clone())?;
+            Ok(OpenDalStorage::Azdls {
+                configured_scheme,
+                config: Arc::new(config),
+            })
+        }
+        #[cfg(feature = "opendal-fs")]
+        "file" => Ok(OpenDalStorage::LocalFs),
+        #[cfg(feature = "opendal-memory")]
+        "memory" => 
Ok(OpenDalStorage::Memory(crate::memory::memory_config_build()?)),
+        _ => Err(Error::new(
+            ErrorKind::FeatureUnsupported,
+            format!("Unsupported storage scheme: {scheme}"),
+        )),
+    }
+}
+
+/// A resolving storage factory that creates [`OpenDalResolvingStorage`] 
instances.
+///
+/// This factory accepts paths from any supported storage system and 
dynamically
+/// delegates operations to the appropriate [`OpenDalStorage`] variant based on
+/// the path scheme.
+///
+/// # Example
+///
+/// ```rust,ignore
+/// use std::sync::Arc;
+/// use iceberg::io::FileIOBuilder;
+/// use iceberg_storage_opendal::OpenDalResolvingStorageFactory;
+///
+/// let factory = OpenDalResolvingStorageFactory::new();
+/// let file_io = FileIOBuilder::new(Arc::new(factory))
+///     .with_prop("s3.region", "us-east-1")
+///     .build();
+/// ```
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub struct OpenDalResolvingStorageFactory {
+    /// Custom AWS credential loader for S3 storage.
+    #[cfg(feature = "opendal-s3")]
+    #[serde(skip)]
+    customized_credential_load: Option<CustomAwsCredentialLoader>,
+}
+
+impl Default for OpenDalResolvingStorageFactory {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl OpenDalResolvingStorageFactory {
+    /// Create a new resolving storage factory.
+    pub fn new() -> Self {
+        Self {
+            #[cfg(feature = "opendal-s3")]
+            customized_credential_load: None,
+        }
+    }
+
+    /// Set a custom AWS credential loader for S3 storage.
+    #[cfg(feature = "opendal-s3")]
+    pub fn with_s3_credential_loader(mut self, loader: 
CustomAwsCredentialLoader) -> Self {
+        self.customized_credential_load = Some(loader);
+        self
+    }
+}
+
+#[typetag::serde]
+impl StorageFactory for OpenDalResolvingStorageFactory {
+    fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
+        Ok(Arc::new(OpenDalResolvingStorage {
+            props: config.props().clone(),
+            storages: RwLock::new(HashMap::new()),
+            #[cfg(feature = "opendal-s3")]
+            customized_credential_load: 
self.customized_credential_load.clone(),
+        }))
+    }
+}
+
+/// A resolving storage that auto-detects the scheme from a path and delegates
+/// to the appropriate [`OpenDalStorage`] variant.
+///
+/// Sub-storages are lazily created on first use for each scheme and cached
+/// for subsequent operations.
+#[derive(Debug, Serialize, Deserialize)]
+pub struct OpenDalResolvingStorage {
+    /// Configuration properties shared across all backends.
+    props: HashMap<String, String>,
+    /// Cache of scheme → storage mappings.
+    #[serde(skip, default)]
+    storages: RwLock<HashMap<String, Arc<OpenDalStorage>>>,

Review Comment:
   IIRC the `configured_scheme` was a legacy setting from before we refactor 
`Storage` trait. I think we no longer need this field since the `Stroage` now 
accepts the full url. Please create an issue to track it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to