tustvold commented on code in PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#discussion_r884798823


##########
datafusion/core/src/datasource/object_store.rs:
##########
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! ObjectStoreRegistry holds all the object stores at Runtime with a scheme 
for each store.
+//! This allows the user to extend DataFusion with different storage systems 
such as S3 or HDFS
+//! and query data inside these systems.
+
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::local::{LocalFileSystem, 
LOCAL_SCHEME};
+use datafusion_data_access::object_store::ObjectStore;
+use parking_lot::RwLock;
+use std::collections::HashMap;
+use std::sync::Arc;
+use url::Url;
+
+/// A parsed URL identifying a particular [`ObjectStore`]
+#[derive(Debug, Clone)]
+pub struct ObjectStoreUrl {
+    url: Url,
+}
+
+impl ObjectStoreUrl {
+    /// Parse an [`ObjectStoreUrl`] from a string
+    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+        let mut parsed =
+            Url::parse(s.as_ref()).map_err(|e| 
DataFusionError::External(Box::new(e)))?;
+
+        let remaining = &parsed[url::Position::BeforePath..];
+        if !remaining.is_empty() && remaining != "/" {
+            return Err(DataFusionError::Execution(format!(
+                "ObjectStoreUrl must only contain scheme and authority, got: 
{}",
+                remaining
+            )));
+        }
+
+        // Always set path for consistency
+        parsed.set_path("/");
+        Ok(Self { url: parsed })
+    }
+
+    /// An [`ObjectStoreUrl`] for the local filesystem
+    pub fn local_filesystem() -> Self {
+        Self::parse("file://").unwrap()
+    }
+
+    /// Returns this [`ObjectStoreUrl`] as a string
+    pub fn as_str(&self) -> &str {
+        self.as_ref()
+    }
+}
+
+impl AsRef<str> for ObjectStoreUrl {
+    fn as_ref(&self) -> &str {
+        self.url.as_ref()
+    }
+}
+
+impl AsRef<Url> for ObjectStoreUrl {
+    fn as_ref(&self) -> &Url {
+        &self.url
+    }
+}
+
+impl std::fmt::Display for ObjectStoreUrl {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        self.as_str().fmt(f)
+    }
+}
+
+/// Object store registry
+pub struct ObjectStoreRegistry {
+    /// A map from scheme to object store that serve list / read operations 
for the store
+    pub object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
+}
+
+impl std::fmt::Debug for ObjectStoreRegistry {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        f.debug_struct("ObjectStoreRegistry")
+            .field(
+                "schemes",
+                &self.object_stores.read().keys().collect::<Vec<_>>(),
+            )
+            .finish()
+    }
+}
+
+impl Default for ObjectStoreRegistry {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl ObjectStoreRegistry {
+    /// Create the registry that object stores can registered into.
+    /// ['LocalFileSystem'] store is registered in by default to support read 
local files natively.
+    pub fn new() -> Self {
+        let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
+        map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem));
+
+        Self {
+            object_stores: RwLock::new(map),
+        }
+    }
+
+    /// Adds a new store to this registry.
+    /// If a store of the same prefix existed before, it is replaced in the 
registry and returned.
+    pub fn register_store(
+        &self,
+        scheme: String,
+        store: Arc<dyn ObjectStore>,
+    ) -> Option<Arc<dyn ObjectStore>> {
+        let mut stores = self.object_stores.write();
+        stores.insert(scheme, store)
+    }
+
+    /// Get the store registered for scheme
+    pub fn get(&self, scheme: &str) -> Option<Arc<dyn ObjectStore>> {
+        let stores = self.object_stores.read();
+        stores.get(scheme).cloned()
+    }
+
+    /// Get a suitable store for the provided URL. For example:
+    ///
+    /// - URL with scheme `file://` or no schema will return the default 
LocalFS store
+    /// - URL with scheme `s3://` will return the S3 store if it's registered
+    ///
+    pub fn get_by_url(&self, url: impl AsRef<Url>) -> Result<Arc<dyn 
ObjectStore>> {
+        let url = url.as_ref();
+        let stores = self.object_stores.read();
+        let store = stores.get(url.scheme()).map(Clone::clone).ok_or_else(|| {

Review Comment:
   Will update to use the member function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to