andygrove commented on code in PR #1494: URL: https://github.com/apache/datafusion-comet/pull/1494#discussion_r1992090834
########## native/core/src/parquet/parquet_support.rs: ########## @@ -199,38 +202,147 @@ fn cast_struct_to_struct( } } -// Default object store which is local filesystem +// Mirrors object_store::parse::parse_url for the hdfs object store +#[cfg(feature = "hdfs")] +fn parse_hdfs_url(url: &Url) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> { + match datafusion_comet_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new(url.as_ref()) + { + Some(object_store) => { + let path = object_store.get_path(url.as_str()); + Ok((Box::new(object_store), path)) + } + _ => { + return Err(object_store::Error::Generic { + store: "HadoopFileSystem", + source: Box::new(datafusion_comet_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::HdfsErr::Generic( + "Could not create hdfs object store".to_string(), + )), + }); + } + } +} + #[cfg(not(feature = "hdfs"))] -pub(crate) fn register_object_store( - session_context: Arc<SessionContext>, -) -> Result<ObjectStoreUrl, ExecutionError> { - let object_store = object_store::local::LocalFileSystem::new(); - let url = ObjectStoreUrl::parse("file://")?; - session_context - .runtime_env() - .register_object_store(url.as_ref(), Arc::new(object_store)); - Ok(url) +fn parse_hdfs_url(_url: &Url) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> { + Err(object_store::Error::Generic { + store: "HadoopFileSystem", + source: "Hdfs support is not enabled in this build".into(), + }) } -// HDFS object store -#[cfg(feature = "hdfs")] -pub(crate) fn register_object_store( - session_context: Arc<SessionContext>, -) -> Result<ObjectStoreUrl, ExecutionError> { - // TODO: read the namenode configuration from file schema or from spark.defaultFS - let url = ObjectStoreUrl::parse("hdfs://namenode:9000")?; - if let Some(object_store) = - datafusion_comet_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new(url.as_ref()) - { - session_context - .runtime_env() - .register_object_store(url.as_ref(), Arc::new(object_store)); +/// Parses the url, registers the object store, and returns a tuple of the object store url and object store path +pub(crate) fn prepare_object_store( + runtime_env: Arc<RuntimeEnv>, + url: String, +) -> Result<(ObjectStoreUrl, Path), ExecutionError> { + let mut url = Url::parse(url.as_str()) + .map_err(|e| ExecutionError::GeneralError(format!("Error parsing URL {url}: {e}")))?; + let mut scheme = url.scheme(); + if scheme == "s3a" { + scheme = "s3"; + url.set_scheme("s3").map_err(|_| { + ExecutionError::GeneralError("Could not convert scheme from s3a to s3".to_string()) + })?; + } + let url_key = format!( + "{}://{}", + scheme, + &url[url::Position::BeforeHost..url::Position::AfterPort], + ); + + let (object_store, object_store_path): (Box<dyn ObjectStore>, Path) = if scheme == "hdfs" { + parse_hdfs_url(&url) + } else { + parse_url(&url) + } + .map_err(|e| ExecutionError::GeneralError(e.to_string()))?; + + let object_store_url = ObjectStoreUrl::parse(url_key.clone())?; + runtime_env.register_object_store(&url, Arc::from(object_store)); + Ok((object_store_url, object_store_path)) +} + +#[cfg(test)] +mod tests { + use crate::execution::operators::ExecutionError; + use crate::parquet::parquet_support::prepare_object_store; + use datafusion_execution::object_store::ObjectStoreUrl; + use datafusion_execution::runtime_env::RuntimeEnv; + use object_store::path::Path; + use std::sync::Arc; + use url::Url; + + #[test] + #[cfg(not(feature = "hdfs"))] + fn test_prepare_object_store() { + let local_file_system_url = "file:///comet/spark-warehouse/part-00000.snappy.parquet"; + let s3_url = "s3a://test_bucket/comet/spark-warehouse/part-00000.snappy.parquet"; + let hdfs_url = "hdfs://localhost:8020/comet/spark-warehouse/part-00000.snappy.parquet"; + + let all_urls = [local_file_system_url, s3_url, hdfs_url]; + let expected: Vec<Result<(ObjectStoreUrl, Path), ExecutionError>> = vec![ + Ok(( + ObjectStoreUrl::parse("file://").unwrap(), + Path::from("/comet/spark-warehouse/part-00000.snappy.parquet"), + )), + Ok(( + ObjectStoreUrl::parse("s3://test_bucket").unwrap(), + Path::from("/comet/spark-warehouse/part-00000.snappy.parquet"), + )), + Err(ExecutionError::GeneralError( + "Generic HadoopFileSystem error: Hdfs support is not enabled in this build" + .parse() + .unwrap(), + )), + ]; - return Ok(url); + for (i, url_str) in all_urls.iter().enumerate() { + let url = &Url::parse(url_str).unwrap(); + let res = prepare_object_store(Arc::new(RuntimeEnv::default()), url.to_string()); + + let expected = expected.get(i).unwrap(); + match expected { + Ok((o, p)) => { + let (r_o, r_p) = res.unwrap(); + assert_eq!(r_o, *o); + assert_eq!(r_p, *p); + } + Err(e) => { + assert!(res.is_err()); + let Err(res_e) = res else { + panic!("test failed") + }; + assert_eq!(e.to_string(), res_e.to_string()) + } + } + } } - Err(ExecutionError::GeneralError(format!( - "HDFS object store cannot be created for {}", - url - ))) + #[test] + #[cfg(feature = "hdfs")] + fn test_prepare_object_store() { + let hdfs_url = "hdfs://localhost:8020/comet/spark-warehouse/part-00000.snappy.parquet"; + let expected: Result<(ObjectStoreUrl, Path), ExecutionError> = Ok(( + ObjectStoreUrl::parse("hdfs://localhost:8020").unwrap(), + Path::from("/comet/spark-warehouse/part-00000.snappy.parquet"), + )); + + let url = &Url::parse(hdfs_url).unwrap(); + let res = prepare_object_store(Arc::new(RuntimeEnv::default()), url.to_string()); + + match expected { + Ok((o, p)) => { + let (r_o, r_p) = res.unwrap(); + assert_eq!(r_o, *o); + assert_eq!(r_p, *p); + } + Err(e) => { + assert!(res.is_err()); + let Err(res_e) = res else { + panic!("test failed") + }; + assert_eq!(e.to_string(), res_e.to_string()) + } Review Comment: Can this match arm ever be invoked? `expected` seems to always be `Ok` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org