This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new ffd6454fe feat(storage): implement opendal resolving storage (#2231)
ffd6454fe is described below
commit ffd6454fe283e0815593d2fb93169ea36b690174
Author: Shawn Chang <[email protected]>
AuthorDate: Mon Mar 16 20:46:31 2026 -0700
feat(storage): implement opendal resolving storage (#2231)
## Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->
- Closes #2210
## What changes are included in this PR?
- Add OpenDalResolvingStorage
<!--
Provide a summary of the modifications in this PR. List the main changes
such as new features, bug fixes, refactoring, or any other updates.
-->
## Are these changes tested?
Added a new test
<!--
Specify what test covers (unit test, integration test, etc.).
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
---
crates/storage/opendal/src/lib.rs | 3 +
crates/storage/opendal/src/resolving.rs | 319 +++++++++++++++++++++
.../opendal/tests/resolving_storage_test.rs | 297 +++++++++++++++++++
3 files changed, 619 insertions(+)
diff --git a/crates/storage/opendal/src/lib.rs
b/crates/storage/opendal/src/lib.rs
index 7c11f80ad..816068052 100644
--- a/crates/storage/opendal/src/lib.rs
+++ b/crates/storage/opendal/src/lib.rs
@@ -90,6 +90,9 @@ cfg_if! {
}
}
+mod resolving;
+pub use resolving::{OpenDalResolvingStorage, OpenDalResolvingStorageFactory};
+
/// OpenDAL-based storage factory.
///
/// Maps scheme to the corresponding OpenDalStorage storage variant.
diff --git a/crates/storage/opendal/src/resolving.rs
b/crates/storage/opendal/src/resolving.rs
new file mode 100644
index 000000000..7c06cf96a
--- /dev/null
+++ b/crates/storage/opendal/src/resolving.rs
@@ -0,0 +1,319 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Resolving storage that auto-detects the scheme from a path and delegates
+//! to the appropriate [`OpenDalStorage`] variant.
+
+use std::collections::HashMap;
+use std::sync::{Arc, RwLock};
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::StreamExt;
+use futures::stream::BoxStream;
+use iceberg::io::{
+ FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage,
StorageConfig,
+ StorageFactory,
+};
+use iceberg::{Error, ErrorKind, Result};
+use opendal::Scheme;
+use serde::{Deserialize, Serialize};
+use url::Url;
+
+use crate::OpenDalStorage;
+#[cfg(feature = "opendal-s3")]
+use crate::s3::CustomAwsCredentialLoader;
+
+/// Schemes supported by OpenDalResolvingStorage
+pub const SCHEME_MEMORY: &str = "memory";
+pub const SCHEME_FILE: &str = "file";
+pub const SCHEME_S3: &str = "s3";
+pub const SCHEME_S3A: &str = "s3a";
+pub const SCHEME_S3N: &str = "s3n";
+pub const SCHEME_GS: &str = "gs";
+pub const SCHEME_GCS: &str = "gcs";
+pub const SCHEME_OSS: &str = "oss";
+pub const SCHEME_ABFSS: &str = "abfss";
+pub const SCHEME_ABFS: &str = "abfs";
+pub const SCHEME_WASBS: &str = "wasbs";
+pub const SCHEME_WASB: &str = "wasb";
+
+/// Parse a URL scheme string into an [`opendal::Scheme`].
+fn parse_scheme(scheme: &str) -> Result<Scheme> {
+ match scheme {
+ SCHEME_MEMORY => Ok(Scheme::Memory),
+ SCHEME_FILE | "" => Ok(Scheme::Fs),
+ SCHEME_S3 | SCHEME_S3A | SCHEME_S3N => Ok(Scheme::S3),
+ SCHEME_GS | SCHEME_GCS => Ok(Scheme::Gcs),
+ SCHEME_OSS => Ok(Scheme::Oss),
+ SCHEME_ABFSS | SCHEME_ABFS | SCHEME_WASBS | SCHEME_WASB =>
Ok(Scheme::Azdls),
+ s => s.parse::<Scheme>().map_err(|e| {
+ Error::new(
+ ErrorKind::FeatureUnsupported,
+ format!("Unsupported storage scheme: {s}: {e}"),
+ )
+ }),
+ }
+}
+
+/// Extract the scheme string from a path URL.
+fn extract_scheme(path: &str) -> Result<String> {
+ let url = Url::parse(path).map_err(|e| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Invalid path: {path}, failed to parse URL: {e}"),
+ )
+ })?;
+ Ok(url.scheme().to_string())
+}
+
+/// Build an [`OpenDalStorage`] variant for the given scheme and config
properties.
+fn build_storage_for_scheme(
+ scheme: &str,
+ props: &HashMap<String, String>,
+ #[cfg(feature = "opendal-s3")] customized_credential_load:
&Option<CustomAwsCredentialLoader>,
+) -> Result<OpenDalStorage> {
+ match parse_scheme(scheme)? {
+ #[cfg(feature = "opendal-s3")]
+ Scheme::S3 => {
+ let config = crate::s3::s3_config_parse(props.clone())?;
+ Ok(OpenDalStorage::S3 {
+ configured_scheme: scheme.to_string(),
+ config: Arc::new(config),
+ customized_credential_load: customized_credential_load.clone(),
+ })
+ }
+ #[cfg(feature = "opendal-gcs")]
+ Scheme::Gcs => {
+ let config = crate::gcs::gcs_config_parse(props.clone())?;
+ Ok(OpenDalStorage::Gcs {
+ config: Arc::new(config),
+ })
+ }
+ #[cfg(feature = "opendal-oss")]
+ Scheme::Oss => {
+ let config = crate::oss::oss_config_parse(props.clone())?;
+ Ok(OpenDalStorage::Oss {
+ config: Arc::new(config),
+ })
+ }
+ #[cfg(feature = "opendal-azdls")]
+ Scheme::Azdls => {
+ let configured_scheme: crate::azdls::AzureStorageScheme =
scheme.parse()?;
+ let config = crate::azdls::azdls_config_parse(props.clone())?;
+ Ok(OpenDalStorage::Azdls {
+ configured_scheme,
+ config: Arc::new(config),
+ })
+ }
+ #[cfg(feature = "opendal-fs")]
+ Scheme::Fs => Ok(OpenDalStorage::LocalFs),
+ #[cfg(feature = "opendal-memory")]
+ Scheme::Memory =>
Ok(OpenDalStorage::Memory(crate::memory::memory_config_build()?)),
+ unsupported => Err(Error::new(
+ ErrorKind::FeatureUnsupported,
+ format!("Unsupported storage scheme: {unsupported}"),
+ )),
+ }
+}
+
+/// A resolving storage factory that creates [`OpenDalResolvingStorage`]
instances.
+///
+/// This factory accepts paths from any supported storage system and
dynamically
+/// delegates operations to the appropriate [`OpenDalStorage`] variant based on
+/// the path scheme.
+///
+/// # Example
+///
+/// ```rust,ignore
+/// use std::sync::Arc;
+/// use iceberg::io::FileIOBuilder;
+/// use iceberg_storage_opendal::OpenDalResolvingStorageFactory;
+///
+/// let factory = OpenDalResolvingStorageFactory::new();
+/// let file_io = FileIOBuilder::new(Arc::new(factory))
+/// .with_prop("s3.region", "us-east-1")
+/// .build();
+/// ```
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub struct OpenDalResolvingStorageFactory {
+ /// Custom AWS credential loader for S3 storage.
+ #[cfg(feature = "opendal-s3")]
+ #[serde(skip)]
+ customized_credential_load: Option<CustomAwsCredentialLoader>,
+}
+
+impl Default for OpenDalResolvingStorageFactory {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl OpenDalResolvingStorageFactory {
+ /// Create a new resolving storage factory.
+ pub fn new() -> Self {
+ Self {
+ #[cfg(feature = "opendal-s3")]
+ customized_credential_load: None,
+ }
+ }
+
+ /// Set a custom AWS credential loader for S3 storage.
+ #[cfg(feature = "opendal-s3")]
+ pub fn with_s3_credential_loader(mut self, loader:
CustomAwsCredentialLoader) -> Self {
+ self.customized_credential_load = Some(loader);
+ self
+ }
+}
+
+#[typetag::serde]
+impl StorageFactory for OpenDalResolvingStorageFactory {
+ fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
+ Ok(Arc::new(OpenDalResolvingStorage {
+ props: config.props().clone(),
+ storages: RwLock::new(HashMap::new()),
+ #[cfg(feature = "opendal-s3")]
+ customized_credential_load:
self.customized_credential_load.clone(),
+ }))
+ }
+}
+
+/// A resolving storage that auto-detects the scheme from a path and delegates
+/// to the appropriate [`OpenDalStorage`] variant.
+///
+/// Sub-storages are lazily created on first use for each scheme and cached
+/// for subsequent operations.
+#[derive(Debug, Serialize, Deserialize)]
+pub struct OpenDalResolvingStorage {
+ /// Configuration properties shared across all backends.
+ props: HashMap<String, String>,
+ /// Cache of scheme → storage mappings.
+ #[serde(skip, default)]
+ storages: RwLock<HashMap<String, Arc<OpenDalStorage>>>,
+ /// Custom AWS credential loader for S3 storage.
+ #[cfg(feature = "opendal-s3")]
+ #[serde(skip)]
+ customized_credential_load: Option<CustomAwsCredentialLoader>,
+}
+
+impl OpenDalResolvingStorage {
+ /// Resolve the storage for the given path by extracting the scheme and
+ /// returning the cached or newly-created [`OpenDalStorage`].
+ fn resolve(&self, path: &str) -> Result<Arc<OpenDalStorage>> {
+ let scheme = extract_scheme(path)?;
+
+ // Fast path: check read lock first.
+ {
+ let cache = self
+ .storages
+ .read()
+ .map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache
lock poisoned"))?;
+ if let Some(storage) = cache.get(&scheme) {
+ return Ok(storage.clone());
+ }
+ }
+
+ // Slow path: build and insert under write lock.
+ let mut cache = self
+ .storages
+ .write()
+ .map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock
poisoned"))?;
+
+ // Double-check after acquiring write lock.
+ if let Some(storage) = cache.get(&scheme) {
+ return Ok(storage.clone());
+ }
+
+ let storage = build_storage_for_scheme(
+ &scheme,
+ &self.props,
+ #[cfg(feature = "opendal-s3")]
+ &self.customized_credential_load,
+ )?;
+ let storage = Arc::new(storage);
+ cache.insert(scheme, storage.clone());
+ Ok(storage)
+ }
+}
+
+#[async_trait]
+#[typetag::serde]
+impl Storage for OpenDalResolvingStorage {
+ async fn exists(&self, path: &str) -> Result<bool> {
+ self.resolve(path)?.exists(path).await
+ }
+
+ async fn metadata(&self, path: &str) -> Result<FileMetadata> {
+ self.resolve(path)?.metadata(path).await
+ }
+
+ async fn read(&self, path: &str) -> Result<Bytes> {
+ self.resolve(path)?.read(path).await
+ }
+
+ async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
+ self.resolve(path)?.reader(path).await
+ }
+
+ async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
+ self.resolve(path)?.write(path, bs).await
+ }
+
+ async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
+ self.resolve(path)?.writer(path).await
+ }
+
+ async fn delete(&self, path: &str) -> Result<()> {
+ self.resolve(path)?.delete(path).await
+ }
+
+ async fn delete_prefix(&self, path: &str) -> Result<()> {
+ self.resolve(path)?.delete_prefix(path).await
+ }
+
+ async fn delete_stream(&self, mut paths: BoxStream<'static, String>) ->
Result<()> {
+ // Group paths by scheme so each resolved storage receives a batch,
+ // avoiding repeated operator creation per path.
+ let mut grouped: HashMap<String, Vec<String>> = HashMap::new();
+ while let Some(path) = paths.next().await {
+ let scheme = extract_scheme(&path)?;
+ grouped.entry(scheme).or_default().push(path);
+ }
+
+ for (_, paths) in grouped {
+ let storage = self.resolve(&paths[0])?;
+ storage
+ .delete_stream(futures::stream::iter(paths).boxed())
+ .await?;
+ }
+ Ok(())
+ }
+
+ fn new_input(&self, path: &str) -> Result<InputFile> {
+ Ok(InputFile::new(
+ Arc::new(self.resolve(path)?.as_ref().clone()),
+ path.to_string(),
+ ))
+ }
+
+ fn new_output(&self, path: &str) -> Result<OutputFile> {
+ Ok(OutputFile::new(
+ Arc::new(self.resolve(path)?.as_ref().clone()),
+ path.to_string(),
+ ))
+ }
+}
diff --git a/crates/storage/opendal/tests/resolving_storage_test.rs
b/crates/storage/opendal/tests/resolving_storage_test.rs
new file mode 100644
index 000000000..4572ad2c2
--- /dev/null
+++ b/crates/storage/opendal/tests/resolving_storage_test.rs
@@ -0,0 +1,297 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for OpenDalResolvingStorage.
+//!
+//! These tests assume Docker containers are started externally via `make
docker-up`.
+//! Each test uses unique file paths based on module path to avoid conflicts.
+
+#[cfg(all(
+ feature = "opendal-s3",
+ feature = "opendal-fs",
+ feature = "opendal-memory"
+))]
+mod tests {
+ use std::sync::Arc;
+
+ use iceberg::io::{
+ FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION,
S3_SECRET_ACCESS_KEY,
+ };
+ use iceberg_storage_opendal::OpenDalResolvingStorageFactory;
+ use iceberg_test_utils::{get_minio_endpoint,
normalize_test_name_with_parts, set_up};
+
+ fn get_resolving_file_io() -> iceberg::io::FileIO {
+ set_up();
+
+ let minio_endpoint = get_minio_endpoint();
+
+ FileIOBuilder::new(Arc::new(OpenDalResolvingStorageFactory::new()))
+ .with_props(vec![
+ (S3_ENDPOINT, minio_endpoint),
+ (S3_ACCESS_KEY_ID, "admin".to_string()),
+ (S3_SECRET_ACCESS_KEY, "password".to_string()),
+ (S3_REGION, "us-east-1".to_string()),
+ ])
+ .build()
+ }
+
+ fn temp_fs_path(name: &str) -> String {
+ let dir = std::env::temp_dir().join("iceberg_resolving_tests");
+ std::fs::create_dir_all(&dir).unwrap();
+ let path = dir.join(name);
+ // Clean up from previous runs
+ let _ = std::fs::remove_file(&path);
+ format!("file:/{}", path.display())
+ }
+
+ #[tokio::test]
+ async fn test_mixed_scheme_write_and_read() {
+ let file_io = get_resolving_file_io();
+
+ let s3_path = format!(
+ "s3://bucket1/{}",
+ normalize_test_name_with_parts!("test_mixed_scheme_write_and_read")
+ );
+ let fs_path = temp_fs_path("mixed_write_and_read.txt");
+ let mem_path = "memory://test_mixed_scheme_write_and_read";
+
+ // Write to all three schemes
+ file_io
+ .new_output(&s3_path)
+ .unwrap()
+ .write("from_s3".into())
+ .await
+ .unwrap();
+ file_io
+ .new_output(&fs_path)
+ .unwrap()
+ .write("from_fs".into())
+ .await
+ .unwrap();
+ file_io
+ .new_output(mem_path)
+ .unwrap()
+ .write("from_memory".into())
+ .await
+ .unwrap();
+
+ // Read back from all three
+ assert_eq!(
+ file_io.new_input(&s3_path).unwrap().read().await.unwrap(),
+ bytes::Bytes::from("from_s3")
+ );
+ assert_eq!(
+ file_io.new_input(&fs_path).unwrap().read().await.unwrap(),
+ bytes::Bytes::from("from_fs")
+ );
+ assert_eq!(
+ file_io.new_input(mem_path).unwrap().read().await.unwrap(),
+ bytes::Bytes::from("from_memory")
+ );
+ }
+
+ #[tokio::test]
+ async fn test_mixed_scheme_exists_independently() {
+ let file_io = get_resolving_file_io();
+
+ let s3_path = format!(
+ "s3://bucket1/{}",
+
normalize_test_name_with_parts!("test_mixed_scheme_exists_independently")
+ );
+ let fs_path = temp_fs_path("mixed_exists_independently.txt");
+ let mem_path = "memory://test_mixed_scheme_exists_independently";
+
+ // Clean up S3 from previous runs
+ let _ = file_io.delete(&s3_path).await;
+
+ // None exist initially
+ assert!(!file_io.exists(&s3_path).await.unwrap());
+ assert!(!file_io.exists(&fs_path).await.unwrap());
+ assert!(!file_io.exists(mem_path).await.unwrap());
+
+ // Write only to fs
+ file_io
+ .new_output(&fs_path)
+ .unwrap()
+ .write("fs_only".into())
+ .await
+ .unwrap();
+
+ // Only fs exists
+ assert!(!file_io.exists(&s3_path).await.unwrap());
+ assert!(file_io.exists(&fs_path).await.unwrap());
+ assert!(!file_io.exists(mem_path).await.unwrap());
+ }
+
+ #[tokio::test]
+ async fn test_mixed_scheme_delete_one_keeps_others() {
+ let file_io = get_resolving_file_io();
+
+ let s3_path = format!(
+ "s3://bucket1/{}",
+
normalize_test_name_with_parts!("test_mixed_scheme_delete_one_keeps_others")
+ );
+ let fs_path = temp_fs_path("mixed_delete_one_keeps_others.txt");
+ let mem_path = "memory://test_mixed_scheme_delete_one_keeps_others";
+
+ // Write to all three
+ file_io
+ .new_output(&s3_path)
+ .unwrap()
+ .write("s3".into())
+ .await
+ .unwrap();
+ file_io
+ .new_output(&fs_path)
+ .unwrap()
+ .write("fs".into())
+ .await
+ .unwrap();
+ file_io
+ .new_output(mem_path)
+ .unwrap()
+ .write("mem".into())
+ .await
+ .unwrap();
+
+ // Delete only the fs file
+ file_io.delete(&fs_path).await.unwrap();
+
+ // fs gone, S3 and memory still there
+ assert!(file_io.exists(&s3_path).await.unwrap());
+ assert!(!file_io.exists(&fs_path).await.unwrap());
+ assert!(file_io.exists(mem_path).await.unwrap());
+
+ assert_eq!(
+ file_io.new_input(&s3_path).unwrap().read().await.unwrap(),
+ bytes::Bytes::from("s3")
+ );
+ assert_eq!(
+ file_io.new_input(mem_path).unwrap().read().await.unwrap(),
+ bytes::Bytes::from("mem")
+ );
+ }
+
+ #[tokio::test]
+ async fn test_mixed_scheme_interleaved_operations() {
+ let file_io = get_resolving_file_io();
+
+ let s3_path = format!(
+ "s3://bucket1/{}",
+ normalize_test_name_with_parts!("test_mixed_scheme_interleaved")
+ );
+ let fs_path = temp_fs_path("mixed_interleaved.txt");
+ let mem_path = "memory://test_mixed_scheme_interleaved";
+
+ // Interleave: write fs, write memory, write s3
+ file_io
+ .new_output(&fs_path)
+ .unwrap()
+ .write("fs_data".into())
+ .await
+ .unwrap();
+ file_io
+ .new_output(mem_path)
+ .unwrap()
+ .write("mem_data".into())
+ .await
+ .unwrap();
+ file_io
+ .new_output(&s3_path)
+ .unwrap()
+ .write("s3_data".into())
+ .await
+ .unwrap();
+
+ // Read in reverse order: s3, memory, fs
+ assert_eq!(
+ file_io.new_input(&s3_path).unwrap().read().await.unwrap(),
+ bytes::Bytes::from("s3_data")
+ );
+ assert_eq!(
+ file_io.new_input(mem_path).unwrap().read().await.unwrap(),
+ bytes::Bytes::from("mem_data")
+ );
+ assert_eq!(
+ file_io.new_input(&fs_path).unwrap().read().await.unwrap(),
+ bytes::Bytes::from("fs_data")
+ );
+ }
+
+ #[tokio::test]
+ async fn test_invalid_scheme() {
+ let file_io = get_resolving_file_io();
+ let result = file_io.exists("unknown://bucket/key").await;
+ assert!(result.is_err());
+ assert!(
+ result
+ .unwrap_err()
+ .to_string()
+ .contains("Unsupported storage scheme"),
+ );
+ }
+
+ #[tokio::test]
+ async fn test_missing_scheme() {
+ let file_io = get_resolving_file_io();
+ let result = file_io.exists("no-scheme-path").await;
+ assert!(result.is_err());
+ }
+
+ #[cfg(feature = "opendal-s3")]
+ #[tokio::test]
+ async fn test_with_custom_credential_loader() {
+ use async_trait::async_trait;
+ use iceberg_storage_opendal::CustomAwsCredentialLoader;
+ use reqsign::{AwsCredential, AwsCredentialLoad};
+ use reqwest::Client;
+
+ struct MinioCredentialLoader;
+
+ #[async_trait]
+ impl AwsCredentialLoad for MinioCredentialLoader {
+ async fn load_credential(
+ &self,
+ _client: Client,
+ ) -> anyhow::Result<Option<AwsCredential>> {
+ Ok(Some(AwsCredential {
+ access_key_id: "admin".to_string(),
+ secret_access_key: "password".to_string(),
+ session_token: None,
+ expires_in: None,
+ }))
+ }
+ }
+
+ set_up();
+ let minio_endpoint = get_minio_endpoint();
+
+ let factory =
OpenDalResolvingStorageFactory::new().with_s3_credential_loader(
+ CustomAwsCredentialLoader::new(Arc::new(MinioCredentialLoader)),
+ );
+
+ let file_io = FileIOBuilder::new(Arc::new(factory))
+ .with_props(vec![
+ (S3_ENDPOINT, minio_endpoint),
+ (S3_REGION, "us-east-1".to_string()),
+ ])
+ .build();
+
+ // Should be able to access S3 using the custom credential loader
+ assert!(file_io.exists("s3://bucket1/").await.unwrap());
+ }
+}