tustvold commented on code in PR #2578: URL: https://github.com/apache/arrow-datafusion/pull/2578#discussion_r884798823
########## datafusion/core/src/datasource/object_store.rs: ########## @@ -0,0 +1,206 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! ObjectStoreRegistry holds all the object stores at Runtime with a scheme for each store. +//! This allows the user to extend DataFusion with different storage systems such as S3 or HDFS +//! and query data inside these systems. + +use datafusion_common::{DataFusionError, Result}; +use datafusion_data_access::object_store::local::{LocalFileSystem, LOCAL_SCHEME}; +use datafusion_data_access::object_store::ObjectStore; +use parking_lot::RwLock; +use std::collections::HashMap; +use std::sync::Arc; +use url::Url; + +/// A parsed URL identifying a particular [`ObjectStore`] +#[derive(Debug, Clone)] +pub struct ObjectStoreUrl { + url: Url, +} + +impl ObjectStoreUrl { + /// Parse an [`ObjectStoreUrl`] from a string + pub fn parse(s: impl AsRef<str>) -> Result<Self> { + let mut parsed = + Url::parse(s.as_ref()).map_err(|e| DataFusionError::External(Box::new(e)))?; + + let remaining = &parsed[url::Position::BeforePath..]; + if !remaining.is_empty() && remaining != "/" { + return Err(DataFusionError::Execution(format!( + "ObjectStoreUrl must only contain scheme and authority, got: {}", + remaining + ))); + } + + // Always set path for consistency + parsed.set_path("/"); + Ok(Self { url: parsed }) + } + + /// An [`ObjectStoreUrl`] for the local filesystem + pub fn local_filesystem() -> Self { + Self::parse("file://").unwrap() + } + + /// Returns this [`ObjectStoreUrl`] as a string + pub fn as_str(&self) -> &str { + self.as_ref() + } +} + +impl AsRef<str> for ObjectStoreUrl { + fn as_ref(&self) -> &str { + self.url.as_ref() + } +} + +impl AsRef<Url> for ObjectStoreUrl { + fn as_ref(&self) -> &Url { + &self.url + } +} + +impl std::fmt::Display for ObjectStoreUrl { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + self.as_str().fmt(f) + } +} + +/// Object store registry +pub struct ObjectStoreRegistry { + /// A map from scheme to object store that serve list / read operations for the store + pub object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>, +} + +impl std::fmt::Debug for ObjectStoreRegistry { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("ObjectStoreRegistry") + .field( + "schemes", + &self.object_stores.read().keys().collect::<Vec<_>>(), + ) + .finish() + } +} + +impl Default for ObjectStoreRegistry { + fn default() -> Self { + Self::new() + } +} + +impl ObjectStoreRegistry { + /// Create the registry that object stores can registered into. + /// ['LocalFileSystem'] store is registered in by default to support read local files natively. + pub fn new() -> Self { + let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new(); + map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem)); + + Self { + object_stores: RwLock::new(map), + } + } + + /// Adds a new store to this registry. + /// If a store of the same prefix existed before, it is replaced in the registry and returned. + pub fn register_store( + &self, + scheme: String, + store: Arc<dyn ObjectStore>, + ) -> Option<Arc<dyn ObjectStore>> { + let mut stores = self.object_stores.write(); + stores.insert(scheme, store) + } + + /// Get the store registered for scheme + pub fn get(&self, scheme: &str) -> Option<Arc<dyn ObjectStore>> { + let stores = self.object_stores.read(); + stores.get(scheme).cloned() + } + + /// Get a suitable store for the provided URL. For example: + /// + /// - URL with scheme `file://` or no schema will return the default LocalFS store + /// - URL with scheme `s3://` will return the S3 store if it's registered + /// + pub fn get_by_url(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> { + let url = url.as_ref(); + let stores = self.object_stores.read(); + let store = stores.get(url.scheme()).map(Clone::clone).ok_or_else(|| { Review Comment: Will update to use the member function -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org